diff --git a/src/msg/consumer/config.go b/src/msg/consumer/config.go index 17222a5d2a..9ef1c99620 100644 --- a/src/msg/consumer/config.go +++ b/src/msg/consumer/config.go @@ -37,6 +37,7 @@ type Configuration struct { AckBufferSize *int `yaml:"ackBufferSize"` ConnectionWriteBufferSize *int `yaml:"connectionWriteBufferSize"` ConnectionReadBufferSize *int `yaml:"connectionReadBufferSize"` + ConnectionWriteTimeout *time.Duration `yaml:"connectionWriteTimeout"` } // MessagePoolConfiguration is the message pool configuration @@ -92,5 +93,8 @@ func (c *Configuration) NewOptions(iOpts instrument.Options) Options { if c.ConnectionReadBufferSize != nil { opts = opts.SetConnectionReadBufferSize(*c.ConnectionReadBufferSize) } + if c.ConnectionWriteTimeout != nil { + opts = opts.SetConnectionWriteTimeout(*c.ConnectionWriteTimeout) + } return opts } diff --git a/src/msg/consumer/consumer.go b/src/msg/consumer/consumer.go index d23a8b273b..5204ac97a3 100644 --- a/src/msg/consumer/consumer.go +++ b/src/msg/consumer/consumer.go @@ -27,6 +27,7 @@ import ( "github.com/m3db/m3/src/msg/generated/proto/msgpb" "github.com/m3db/m3/src/msg/protocol/proto" + "github.com/m3db/m3/src/x/clock" xio "github.com/m3db/m3/src/x/io" "github.com/uber-go/tally" @@ -125,7 +126,7 @@ func newConsumer( decoder: proto.NewDecoder( conn, opts.DecoderOptions(), opts.ConnectionReadBufferSize(), ), - w: writerFn(conn, wOpts), + w: writerFn(newConnWithTimeout(conn, opts.ConnectionWriteTimeout(), time.Now), wOpts), conn: conn, closed: false, doneCh: make(chan struct{}), @@ -263,3 +264,25 @@ func resetProto(m *msgpb.Message) { m.Metadata.Shard = 0 m.Value = m.Value[:0] } + +type connWithTimeout struct { + net.Conn + + timeout time.Duration + nowFn clock.NowFn +} + +func newConnWithTimeout(conn net.Conn, timeout time.Duration, nowFn clock.NowFn) connWithTimeout { + return connWithTimeout{ + Conn: conn, + timeout: timeout, + nowFn: nowFn, + } +} + +func (conn connWithTimeout) Write(p []byte) (int, error) { + if conn.timeout > 0 { + conn.SetWriteDeadline(conn.nowFn().Add(conn.timeout)) + } + return conn.Conn.Write(p) +} diff --git a/src/msg/consumer/options.go b/src/msg/consumer/options.go index f629a4c93d..ae21bd1fd7 100644 --- a/src/msg/consumer/options.go +++ b/src/msg/consumer/options.go @@ -33,6 +33,8 @@ var ( defaultAckBufferSize = 1048576 defaultAckFlushInterval = 200 * time.Millisecond defaultConnectionBufferSize = 1048576 + // TODO(ryanhall07): set this to 5s once we verify this works. + defaultWriteTimeout = 0 * time.Second ) type options struct { @@ -43,6 +45,7 @@ type options struct { ackBufferSize int writeBufferSize int readBufferSize int + writeTimeout time.Duration iOpts instrument.Options rwOpts xio.Options } @@ -57,6 +60,7 @@ func NewOptions() Options { ackBufferSize: defaultAckBufferSize, writeBufferSize: defaultConnectionBufferSize, readBufferSize: defaultConnectionBufferSize, + writeTimeout: defaultWriteTimeout, iOpts: instrument.NewOptions(), rwOpts: xio.NewOptions(), } @@ -132,6 +136,16 @@ func (opts *options) SetConnectionReadBufferSize(value int) Options { return &o } +func (opts *options) ConnectionWriteTimeout() time.Duration { + return opts.writeTimeout +} + +func (opts *options) SetConnectionWriteTimeout(value time.Duration) Options { + o := *opts + o.writeTimeout = value + return &o +} + func (opts *options) InstrumentOptions() instrument.Options { return opts.iOpts } diff --git a/src/msg/consumer/types.go b/src/msg/consumer/types.go index ad586d3127..5ef69d9336 100644 --- a/src/msg/consumer/types.go +++ b/src/msg/consumer/types.go @@ -106,6 +106,12 @@ type Options interface { // SetConnectionWriteBufferSize sets the buffer size. SetConnectionReadBufferSize(value int) Options + // ConnectionWriteTimeout returns the timeout for writing to the connection. + ConnectionWriteTimeout() time.Duration + + // SetConnectionWriteTimeout sets the write timeout for the connection. + SetConnectionWriteTimeout(value time.Duration) Options + // InstrumentOptions returns the instrument options. InstrumentOptions() instrument.Options