diff --git a/src/dbnode/integration/client.go b/src/dbnode/integration/client.go index 841a2d772b..6e491f743f 100644 --- a/src/dbnode/integration/client.go +++ b/src/dbnode/integration/client.go @@ -67,6 +67,16 @@ func NewTChannelClient(name, address string) (*TestTChannelClient, error) { }, nil } +// Address returns the address. +func (client *TestTChannelClient) Address() string { + return client.address +} + +// Channel returns the TChannel channel. +func (client *TestTChannelClient) Channel() *tchannel.Channel { + return client.channel +} + // TChannelClientWrite writes a datapoint using a tchannel client. func (client *TestTChannelClient) TChannelClientWrite( timeout time.Duration, req *rpc.WriteRequest, diff --git a/src/dbnode/integration/setup.go b/src/dbnode/integration/setup.go index 6d043d5010..9caa48661d 100644 --- a/src/dbnode/integration/setup.go +++ b/src/dbnode/integration/setup.go @@ -149,6 +149,7 @@ type TestSetup interface { Scope() tally.TestScope M3DBClient() client.Client M3DBVerificationAdminClient() client.AdminClient + TChannelClient() *TestTChannelClient Namespaces() []namespace.Metadata TopologyInitializer() topology.Initializer SetTopologyInitializer(topology.Initializer) @@ -789,6 +790,10 @@ func (ts *testSetup) StopServer() error { return nil } +func (ts *testSetup) TChannelClient() *TestTChannelClient { + return ts.tchannelClient +} + func (ts *testSetup) WriteBatch(namespace ident.ID, seriesList generate.SeriesBlock) error { if ts.opts.UseTChannelClientForWriting() { return ts.tchannelClient.TChannelClientWriteBatch( diff --git a/src/dbnode/network/server/tchannelthrift/node/options.go b/src/dbnode/network/server/tchannelthrift/node/options.go index a00f953e20..a85e6df0ca 100644 --- a/src/dbnode/network/server/tchannelthrift/node/options.go +++ b/src/dbnode/network/server/tchannelthrift/node/options.go @@ -28,6 +28,19 @@ import ( "github.com/uber/tchannel-go/thrift" ) +// NewTChanChannelFn creates a tchan channel. +type NewTChanChannelFn func( + channelName string, + opts *tchannel.ChannelOptions, +) (*tchannel.Channel, error) + +func defaultTChanChannelFn( + channelName string, + opts *tchannel.ChannelOptions, +) (*tchannel.Channel, error) { + return tchannel.NewChannel(channelName, opts) +} + // NewTChanNodeServerFn creates a tchan node server. type NewTChanNodeServerFn func( service Service, @@ -49,6 +62,12 @@ type Options interface { // ChannelOptions returns the tchan channel options. ChannelOptions() *tchannel.ChannelOptions + // SetTChanChannelFn sets a tchan node channel registration. + SetTChanChannelFn(value NewTChanChannelFn) Options + + // TChanChannelFn returns a tchan node channel registration. + TChanChannelFn() NewTChanChannelFn + // SetTChanNodeServerFn sets a tchan node server builder. SetTChanNodeServerFn(value NewTChanNodeServerFn) Options @@ -65,6 +84,7 @@ type Options interface { type options struct { channelOptions *tchannel.ChannelOptions instrumentOpts instrument.Options + tchanChannelFn NewTChanChannelFn tchanNodeServerFn NewTChanNodeServerFn } @@ -72,6 +92,7 @@ type options struct { func NewOptions(chanOpts *tchannel.ChannelOptions) Options { return &options{ channelOptions: chanOpts, + tchanChannelFn: defaultTChanChannelFn, tchanNodeServerFn: defaultTChanNodeServerFn, } } @@ -85,6 +106,16 @@ func (o *options) ChannelOptions() *tchannel.ChannelOptions { return o.channelOptions } +func (o *options) SetTChanChannelFn(value NewTChanChannelFn) Options { + opts := *o + opts.tchanChannelFn = value + return &opts +} + +func (o *options) TChanChannelFn() NewTChanChannelFn { + return o.tchanChannelFn +} + func (o *options) SetTChanNodeServerFn(value NewTChanNodeServerFn) Options { opts := *o opts.tchanNodeServerFn = value diff --git a/src/dbnode/network/server/tchannelthrift/node/server.go b/src/dbnode/network/server/tchannelthrift/node/server.go index 91f560fa92..de6e8858c7 100644 --- a/src/dbnode/network/server/tchannelthrift/node/server.go +++ b/src/dbnode/network/server/tchannelthrift/node/server.go @@ -59,7 +59,7 @@ func (s *server) ListenAndServe() (ns.Close, error) { immutableOpts := *chanOpts opts = &immutableOpts } - channel, err := tchannel.NewChannel(channel.ChannelName, opts) + channel, err := s.opts.TChanChannelFn()(channel.ChannelName, opts) if err != nil { return nil, err } diff --git a/src/dbnode/server/options.go b/src/dbnode/server/options.go index de17ff5f76..85889c860b 100644 --- a/src/dbnode/server/options.go +++ b/src/dbnode/server/options.go @@ -27,4 +27,5 @@ import ( // StorageOptions are options to apply to the database storage options. type StorageOptions struct { TChanNodeServerFn node.NewTChanNodeServerFn + TChanChannelFn node.NewTChanChannelFn } diff --git a/src/dbnode/server/server.go b/src/dbnode/server/server.go index 72422ee912..52df3307c7 100644 --- a/src/dbnode/server/server.go +++ b/src/dbnode/server/server.go @@ -715,6 +715,9 @@ func Run(runOpts RunOptions) { } tchanOpts := ttnode.NewOptions(tchannelOpts). SetInstrumentOptions(opts.InstrumentOptions()) + if fn := runOpts.StorageOptions.TChanChannelFn; fn != nil { + tchanOpts = tchanOpts.SetTChanChannelFn(fn) + } if fn := runOpts.StorageOptions.TChanNodeServerFn; fn != nil { tchanOpts = tchanOpts.SetTChanNodeServerFn(fn) }