diff --git a/src/dbnode/client/client_mock.go b/src/dbnode/client/client_mock.go index 3a9842dceb..0eddc159c9 100644 --- a/src/dbnode/client/client_mock.go +++ b/src/dbnode/client/client_mock.go @@ -2224,6 +2224,34 @@ func (mr *MockOptionsMockRecorder) HostQueueOpsArrayPoolSize() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HostQueueOpsArrayPoolSize", reflect.TypeOf((*MockOptions)(nil).HostQueueOpsArrayPoolSize)) } +// SetHostQueueEmitsHealthStatus mocks base method +func (m *MockOptions) SetHostQueueEmitsHealthStatus(value bool) Options { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SetHostQueueEmitsHealthStatus", value) + ret0, _ := ret[0].(Options) + return ret0 +} + +// SetHostQueueEmitsHealthStatus indicates an expected call of SetHostQueueEmitsHealthStatus +func (mr *MockOptionsMockRecorder) SetHostQueueEmitsHealthStatus(value interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetHostQueueEmitsHealthStatus", reflect.TypeOf((*MockOptions)(nil).SetHostQueueEmitsHealthStatus), value) +} + +// HostQueueEmitsHealthStatus mocks base method +func (m *MockOptions) HostQueueEmitsHealthStatus() bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "HostQueueEmitsHealthStatus") + ret0, _ := ret[0].(bool) + return ret0 +} + +// HostQueueEmitsHealthStatus indicates an expected call of HostQueueEmitsHealthStatus +func (mr *MockOptionsMockRecorder) HostQueueEmitsHealthStatus() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HostQueueEmitsHealthStatus", reflect.TypeOf((*MockOptions)(nil).HostQueueEmitsHealthStatus)) +} + // SetSeriesIteratorPoolSize mocks base method func (m *MockOptions) SetSeriesIteratorPoolSize(value int) Options { m.ctrl.T.Helper() @@ -3731,6 +3759,34 @@ func (mr *MockAdminOptionsMockRecorder) HostQueueOpsArrayPoolSize() *gomock.Call return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HostQueueOpsArrayPoolSize", reflect.TypeOf((*MockAdminOptions)(nil).HostQueueOpsArrayPoolSize)) } +// SetHostQueueEmitsHealthStatus mocks base method +func (m *MockAdminOptions) SetHostQueueEmitsHealthStatus(value bool) Options { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SetHostQueueEmitsHealthStatus", value) + ret0, _ := ret[0].(Options) + return ret0 +} + +// SetHostQueueEmitsHealthStatus indicates an expected call of SetHostQueueEmitsHealthStatus +func (mr *MockAdminOptionsMockRecorder) SetHostQueueEmitsHealthStatus(value interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetHostQueueEmitsHealthStatus", reflect.TypeOf((*MockAdminOptions)(nil).SetHostQueueEmitsHealthStatus), value) +} + +// HostQueueEmitsHealthStatus mocks base method +func (m *MockAdminOptions) HostQueueEmitsHealthStatus() bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "HostQueueEmitsHealthStatus") + ret0, _ := ret[0].(bool) + return ret0 +} + +// HostQueueEmitsHealthStatus indicates an expected call of HostQueueEmitsHealthStatus +func (mr *MockAdminOptionsMockRecorder) HostQueueEmitsHealthStatus() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HostQueueEmitsHealthStatus", reflect.TypeOf((*MockAdminOptions)(nil).HostQueueEmitsHealthStatus)) +} + // SetSeriesIteratorPoolSize mocks base method func (m *MockAdminOptions) SetSeriesIteratorPoolSize(value int) Options { m.ctrl.T.Helper() diff --git a/src/dbnode/client/connection_pool.go b/src/dbnode/client/connection_pool.go index bcfc04973c..937408673d 100644 --- a/src/dbnode/client/connection_pool.go +++ b/src/dbnode/client/connection_pool.go @@ -32,8 +32,9 @@ import ( "github.com/m3db/m3/src/dbnode/generated/thrift/rpc" "github.com/m3db/m3/src/dbnode/topology" xclose "github.com/m3db/m3/src/x/close" - "github.com/m3db/stackmurmur3/v2" + murmur3 "github.com/m3db/stackmurmur3/v2" + "github.com/uber-go/tally" "github.com/uber/tchannel-go/thrift" "go.uber.org/zap" ) @@ -63,6 +64,7 @@ type connPool struct { sleepHealth sleepFn sleepHealthRetry sleepFn status status + healthStatus tally.Gauge } type conn struct { @@ -94,6 +96,7 @@ func newConnectionPool(host topology.Host, opts Options) connectionPool { sleepConnect: time.Sleep, sleepHealth: time.Sleep, sleepHealthRetry: time.Sleep, + healthStatus: opts.InstrumentOptions().MetricsScope().Gauge("health-status"), } return p @@ -186,11 +189,13 @@ func (p *connPool) connectEvery(interval time.Duration, stutter time.Duration) { // Health check the connection if err := p.healthCheckNewConn(client, p.opts); err != nil { + p.maybeEmitHealthStatus(healthStatusCheckFailed) log.Debug("could not connect, failed health check", zap.String("host", address), zap.Error(err)) channel.Close() return } + p.maybeEmitHealthStatus(healthStatusOK) p.Lock() if p.status == statusOpen { p.pool = append(p.pool, conn{channel, client}) @@ -206,6 +211,12 @@ func (p *connPool) connectEvery(interval time.Duration, stutter time.Duration) { } } +func (p *connPool) maybeEmitHealthStatus(hs healthStatus) { + if p.opts.HostQueueEmitsHealthStatus() { + p.healthStatus.Update(float64(hs)) + } +} + func (p *connPool) healthCheckEvery(interval time.Duration, stutter time.Duration) { log := p.opts.InstrumentOptions().Logger() nowFn := p.opts.ClockOptions().NowFn() diff --git a/src/dbnode/client/options.go b/src/dbnode/client/options.go index 6463d4087e..6b06004048 100644 --- a/src/dbnode/client/options.go +++ b/src/dbnode/client/options.go @@ -119,6 +119,9 @@ const ( // defaultHostQueueOpsArrayPoolSize is the default host queue ops array pool size defaultHostQueueOpsArrayPoolSize = 8 + // defaultHostQueueEmitsHealthStatus is false + defaultHostQueueEmitsHealthStatus = false + // defaultBackgroundConnectInterval is the default background connect interval defaultBackgroundConnectInterval = 4 * time.Second @@ -261,6 +264,7 @@ type options struct { hostQueueOpsFlushSize int hostQueueOpsFlushInterval time.Duration hostQueueOpsArrayPoolSize int + hostQueueEmitsHealthStatus bool seriesIteratorPoolSize int seriesIteratorArrayPoolBuckets []pool.Bucket checkedBytesWrapperPoolSize int @@ -381,6 +385,7 @@ func newOptions() *options { hostQueueOpsFlushSize: defaultHostQueueOpsFlushSize, hostQueueOpsFlushInterval: defaultHostQueueOpsFlushInterval, hostQueueOpsArrayPoolSize: defaultHostQueueOpsArrayPoolSize, + hostQueueEmitsHealthStatus: defaultHostQueueEmitsHealthStatus, seriesIteratorPoolSize: defaultSeriesIteratorPoolSize, seriesIteratorArrayPoolBuckets: defaultSeriesIteratorArrayPoolBuckets, checkedBytesWrapperPoolSize: defaultCheckedBytesWrapperPoolSize, @@ -884,6 +889,16 @@ func (o *options) HostQueueOpsArrayPoolSize() int { return o.hostQueueOpsArrayPoolSize } +func (o *options) SetHostQueueEmitsHealthStatus(value bool) Options { + opts := *o + opts.hostQueueEmitsHealthStatus = value + return &opts +} + +func (o *options) HostQueueEmitsHealthStatus() bool { + return o.hostQueueEmitsHealthStatus +} + func (o *options) SetSeriesIteratorPoolSize(value int) Options { opts := *o opts.seriesIteratorPoolSize = value diff --git a/src/dbnode/client/types.go b/src/dbnode/client/types.go index 3c4ba2ed03..ccbd574f94 100644 --- a/src/dbnode/client/types.go +++ b/src/dbnode/client/types.go @@ -515,6 +515,12 @@ type Options interface { // HostQueueOpsArrayPoolSize returns the hostQueueOpsArrayPoolSize. HostQueueOpsArrayPoolSize() int + // SetHostQueueEmitsHealthStatus sets the hostQueueEmitHealthStatus. + SetHostQueueEmitsHealthStatus(value bool) Options + + // HostQueueEmitsHealthStatus returns the hostQueueEmitHealthStatus. + HostQueueEmitsHealthStatus() bool + // SetSeriesIteratorPoolSize sets the seriesIteratorPoolSize. SetSeriesIteratorPoolSize(value int) Options @@ -710,6 +716,13 @@ const ( statusClosed ) +type healthStatus int + +const ( + healthStatusCheckFailed healthStatus = iota + healthStatusOK +) + type op interface { // Size returns the effective size of inner operations. Size() int