Skip to content

Commit

Permalink
Honor namespace-level "WritesToCommitlog" configuration (#1232)
Browse files Browse the repository at this point in the history
* Make database honor WritesToCommitlog on namespace options and add tests for WriteBatch and WriteTaggedBatch

* fix broken test

* fix broken test

* Add test for WriteTagged
  • Loading branch information
richardartoul authored Dec 6, 2018
1 parent 90f21b2 commit 458cf4a
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 3 deletions.
12 changes: 12 additions & 0 deletions src/dbnode/storage/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,10 @@ func (d *db) Write(
return err
}

if !n.Options().WritesToCommitLog() {
return nil
}

dp := ts.Datapoint{Timestamp: timestamp, Value: value}
return d.commitLog.Write(ctx, series, dp, unit, annotation)
}
Expand Down Expand Up @@ -540,6 +544,10 @@ func (d *db) WriteTagged(
return err
}

if !n.Options().WritesToCommitLog() {
return nil
}

dp := ts.Datapoint{Timestamp: timestamp, Value: value}
return d.commitLog.Write(ctx, series, dp, unit, annotation)
}
Expand Down Expand Up @@ -644,6 +652,10 @@ func (d *db) writeBatch(
writes.SetOutcome(i, series, err)
}

if !n.Options().WritesToCommitLog() {
return nil
}

return d.commitLog.WriteBatch(ctx, writes)
}

Expand Down
52 changes: 49 additions & 3 deletions src/dbnode/storage/database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,14 @@ func TestDatabaseUpdateNamespace(t *testing.T) {
}

func TestDatabaseNamespaceIndexFunctions(t *testing.T) {
testDatabaseNamespaceIndexFunctions(t, true)
}

func TestDatabaseNamespaceIndexFunctionsNoCommitlog(t *testing.T) {
testDatabaseNamespaceIndexFunctions(t, false)
}

func testDatabaseNamespaceIndexFunctions(t *testing.T, commitlogEnabled bool) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

Expand All @@ -606,10 +614,22 @@ func TestDatabaseNamespaceIndexFunctions(t *testing.T) {
close(mapCh)
}()

commitlog := d.commitLog
if !commitlogEnabled {
// We don't mock the commitlog so set this to nil to ensure its
// not being used as the test will panic if any methods are called
// on it.
d.commitLog = nil
}

ns := dbAddNewMockNamespace(ctrl, d, "testns")
nsOptions := namespace.NewOptions().
SetWritesToCommitLog(commitlogEnabled)

ns.EXPECT().GetOwnedShards().Return([]databaseShard{}).AnyTimes()
ns.EXPECT().Tick(gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
ns.EXPECT().BootstrapState().Return(ShardBootstrapStates{}).AnyTimes()
ns.EXPECT().Options().Return(nsOptions).AnyTimes()
require.NoError(t, d.Open())

var (
Expand Down Expand Up @@ -651,6 +671,9 @@ func TestDatabaseNamespaceIndexFunctions(t *testing.T) {
require.Error(t, err)

ns.EXPECT().Close().Return(nil)

// Ensure commitlog is set before closing because this will call commitlog.Close()
d.commitLog = commitlog
require.NoError(t, d.Close())
}

Expand Down Expand Up @@ -701,11 +724,19 @@ func TestDatabaseWriteTaggedBatchNoNamespace(t *testing.T) {
}

func TestDatabaseWriteBatch(t *testing.T) {
testDatabaseWriteBatch(t, false)
testDatabaseWriteBatch(t, false, true)
}

func TestDatabaseWriteTaggedBatch(t *testing.T) {
testDatabaseWriteBatch(t, true)
testDatabaseWriteBatch(t, true, true)
}

func TestDatabaseWriteBatchNoCommitlog(t *testing.T) {
testDatabaseWriteBatch(t, false, false)
}

func TestDatabaseWriteTaggedBatchNoCommitlog(t *testing.T) {
testDatabaseWriteBatch(t, true, false)
}

type fakeIndexedErrorHandler struct {
Expand All @@ -721,7 +752,7 @@ type indexedErr struct {
err error
}

func testDatabaseWriteBatch(t *testing.T, tagged bool) {
func testDatabaseWriteBatch(t *testing.T, tagged bool, commitlogEnabled bool) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

Expand All @@ -730,10 +761,22 @@ func testDatabaseWriteBatch(t *testing.T, tagged bool) {
close(mapCh)
}()

commitlog := d.commitLog
if !commitlogEnabled {
// We don't mock the commitlog so set this to nil to ensure its
// not being used as the test will panic if any methods are called
// on it.
d.commitLog = nil
}

ns := dbAddNewMockNamespace(ctrl, d, "testns")
nsOptions := namespace.NewOptions().
SetWritesToCommitLog(commitlogEnabled)

ns.EXPECT().GetOwnedShards().Return([]databaseShard{}).AnyTimes()
ns.EXPECT().Tick(gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
ns.EXPECT().BootstrapState().Return(ShardBootstrapStates{}).AnyTimes()
ns.EXPECT().Options().Return(nsOptions).AnyTimes()
ns.EXPECT().Close().Return(nil).Times(1)
require.NoError(t, d.Open())

Expand Down Expand Up @@ -817,6 +860,9 @@ func testDatabaseWriteBatch(t *testing.T, tagged bool) {
// Make sure it calls the error handler with the "original" provided index, not the position
// of the write in the WriteBatch slice.
require.Equal(t, (i-1)*2, errHandler.errs[0].index)

// Ensure commitlog is set before closing because this will call commitlog.Close()
d.commitLog = commitlog
require.NoError(t, d.Close())
}

Expand Down

0 comments on commit 458cf4a

Please sign in to comment.