Skip to content

Commit

Permalink
Added replicated session tests
Browse files Browse the repository at this point in the history
  • Loading branch information
simonrobb committed Aug 20, 2019
1 parent b787b74 commit 63c55c4
Show file tree
Hide file tree
Showing 8 changed files with 356 additions and 2,724 deletions.
4 changes: 2 additions & 2 deletions src/dbnode/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func NewAdminClient(opts AdminMultiClusterOptions) (AdminClient, error) {
}

func newClient(opts MultiClusterOptions) (*client, error) {
if err := opts.Validate(); err != nil {
if err := opts.Options().Validate(); err != nil {
return nil, err
}
return &client{opts: opts, newSessionFn: newReplicatedSession}, nil
Expand Down Expand Up @@ -89,7 +89,7 @@ func (c *client) defaultSession() (AdminSession, error) {
return session, nil
}

func (c *client) Options() Options {
func (c *client) Options() MultiClusterOptions {
return c.opts
}

Expand Down
2,778 changes: 87 additions & 2,691 deletions src/dbnode/client/client_mock.go

Large diffs are not rendered by default.

14 changes: 9 additions & 5 deletions src/dbnode/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,12 @@ import (
)

func testClient(t *testing.T, ctrl *gomock.Controller) Client {
opts := NewMockMultiClusterOptions(ctrl)
multiOpts := NewMockMultiClusterOptions(ctrl)
opts := NewMockOptions(ctrl)
multiOpts.EXPECT().Options().Return(opts)
opts.EXPECT().Validate().Return(nil)

client, err := NewClient(opts)
client, err := NewClient(multiOpts)
assert.NoError(t, err)
assert.NotNil(t, client)

Expand All @@ -48,10 +50,12 @@ func TestClientNewClientValidatesOptions(t *testing.T) {
testClient(t, ctrl)

anError := fmt.Errorf("an error")
opts := NewMockMultiClusterOptions(ctrl)
opts.EXPECT().Validate().Return(anError)
multiOpts := NewMockMultiClusterOptions(ctrl)
opts := NewMockOptions(ctrl)
multiOpts.EXPECT().Options().Return(opts)
opts.EXPECT().Validate().Return(nil)

_, err := NewClient(opts)
_, err := NewClient(multiOpts)
assert.Error(t, err)
assert.Equal(t, anError, err)
}
Expand Down
9 changes: 6 additions & 3 deletions src/dbnode/client/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,8 +272,7 @@ func (c Configuration) NewAdminClient(
}
}

v := NewAdminMultiClusterOptions().
SetAsyncTopologyInitializers(asyncTopoInits).
v := NewAdminOptions().
SetTopologyInitializer(topoInit).
SetChannelOptions(xtchannel.NewDefaultChannelOptions()).
SetInstrumentOptions(iopts)
Expand Down Expand Up @@ -349,8 +348,12 @@ func (c Configuration) NewAdminClient(
// v = v.SetSchemaRegistry(schemaRegistry)
// }

u := NewAdminMultiClusterOptions().
SetOptions(v).
SetAsyncTopologyInitializers(asyncTopoInits)

// Apply programtic custom options last
opts := v.(AdminMultiClusterOptions)
opts := u.(AdminMultiClusterOptions)
for _, opt := range custom {
opts = opt(opts)
}
Expand Down
19 changes: 15 additions & 4 deletions src/dbnode/client/multi_cluster_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,26 +13,36 @@ var (
var _ MultiClusterOptions = (*multiClusterOptions)(nil)

type multiClusterOptions struct {
options
options Options
asyncTopologyInitializers []topology.Initializer
}

// NewMultiClusterOptions creates a new set of multi cluster options
func NewMultiClusterOptions() MultiClusterOptions {
return &multiClusterOptions{
options: *newOptions(),
options: newOptions(),
asyncTopologyInitializers: []topology.Initializer{},
}
}

// NewAdminMultiClusterOptions creates a new set of administration multi cluster options
func NewAdminMultiClusterOptions() AdminMultiClusterOptions {
return &multiClusterOptions{
options: *newOptions(),
options: newOptions(),
asyncTopologyInitializers: []topology.Initializer{},
}
}

func (o *multiClusterOptions) SetOptions(value Options) MultiClusterOptions {
opts := *o
opts.options = value
return &opts
}

func (o *multiClusterOptions) Options() Options {
return o.options
}

func (o *multiClusterOptions) SetAsyncTopologyInitializers(value []topology.Initializer) MultiClusterOptions {
opts := *o
opts.asyncTopologyInitializers = value
Expand All @@ -46,7 +56,8 @@ func (o *multiClusterOptions) AsyncTopologyInitializers() []topology.Initializer
func (o *multiClusterOptions) OptionsForAsyncClusters() []Options {
result := make([]Options, 0, len(o.asyncTopologyInitializers))
for _, topoInit := range o.asyncTopologyInitializers {
result = append(result, o.SetTopologyInitializer(topoInit))
options := o.Options().SetTopologyInitializer(topoInit)
result = append(result, options)
}
return result
}
25 changes: 17 additions & 8 deletions src/dbnode/client/replicated_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type replicatedSession struct {
scope tally.Scope
log *zap.Logger
metrics replicatedSessionMetrics
outCh chan error
}

type replicatedSessionMetrics struct {
Expand Down Expand Up @@ -84,21 +85,22 @@ func newReplicatedSession(opts MultiClusterOptions, options ...replicatedSession
workerPool := m3sync.NewWorkerPool(maxReplicationConcurrency)
workerPool.Init()

scope := opts.InstrumentOptions().MetricsScope()
scope := opts.Options().InstrumentOptions().MetricsScope()

session := replicatedSession{
workerPool: workerPool,
scope: scope,
log: opts.InstrumentOptions().Logger(),
log: opts.Options().InstrumentOptions().Logger(),
metrics: newReplicatedSessionMetrics(scope),
outCh: make(chan error),
}

// Apply options
for _, option := range options {
option(&session)
}

if err := session.setSession(opts); err != nil {
if err := session.setSession(opts.Options()); err != nil {
return nil, err
}
if err := session.setAsyncSessions(opts.OptionsForAsyncClusters()); err != nil {
Expand All @@ -110,7 +112,11 @@ func newReplicatedSession(opts MultiClusterOptions, options ...replicatedSession

type writeFunc func(Session) error

func (s replicatedSession) setSession(opts Options) error {
func (s *replicatedSession) setSession(opts Options) error {
if opts.TopologyInitializer() == nil {
return nil
}

session, err := s.newSessionFn(opts)
if err != nil {
return err
Expand All @@ -119,7 +125,7 @@ func (s replicatedSession) setSession(opts Options) error {
return nil
}

func (s replicatedSession) setAsyncSessions(opts []Options) error {
func (s *replicatedSession) setAsyncSessions(opts []Options) error {
sessions := make([]clientSession, 0, len(opts))
for _, oo := range opts {
session, err := s.newSessionFn(oo)
Expand All @@ -134,15 +140,18 @@ func (s replicatedSession) setAsyncSessions(opts []Options) error {

func (s replicatedSession) replicate(fn writeFunc) error {
for _, asyncSession := range s.asyncSessions {
asyncSession := asyncSession // capture var
if s.workerPool.GoIfAvailable(func() {
if err := fn(asyncSession); err != nil {
err := fn(asyncSession)
if err != nil {
s.metrics.replicateError.Inc(1)
s.log.Error("could not replicate write: %v", zap.Error(err))
}
s.outCh <- err
}) {
s.metrics.replicateNotExecuted.Inc(1)
} else {
s.metrics.replicateExecuted.Inc(1)
} else {
s.metrics.replicateNotExecuted.Inc(1)
}
}
return fn(s.session)
Expand Down
Loading

0 comments on commit 63c55c4

Please sign in to comment.