Skip to content

Commit

Permalink
Add a configurable write timeout for consumer ACKs (#2675)
Browse files Browse the repository at this point in the history
Without a timeout this can potentially block forever.

The producer and consumer can potentially dead lock trying to send/receive messages:

Producer -> msg -> Consumer. Consumer is not attempting to read since it's trying to ACK.
Consumer -> ack -> Producer. Producer is not attempting to read since it might be stuck trying to get a lock on the connection.

For backwards compatibility this defaults to 0. We should set it to a sane default (5s) in the future.
  • Loading branch information
ryanhall07 authored Sep 28, 2020
1 parent 80c9f9e commit 3c81774
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 1 deletion.
4 changes: 4 additions & 0 deletions src/msg/consumer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type Configuration struct {
AckBufferSize *int `yaml:"ackBufferSize"`
ConnectionWriteBufferSize *int `yaml:"connectionWriteBufferSize"`
ConnectionReadBufferSize *int `yaml:"connectionReadBufferSize"`
ConnectionWriteTimeout *time.Duration `yaml:"connectionWriteTimeout"`
}

// MessagePoolConfiguration is the message pool configuration
Expand Down Expand Up @@ -92,5 +93,8 @@ func (c *Configuration) NewOptions(iOpts instrument.Options) Options {
if c.ConnectionReadBufferSize != nil {
opts = opts.SetConnectionReadBufferSize(*c.ConnectionReadBufferSize)
}
if c.ConnectionWriteTimeout != nil {
opts = opts.SetConnectionWriteTimeout(*c.ConnectionWriteTimeout)
}
return opts
}
25 changes: 24 additions & 1 deletion src/msg/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/m3db/m3/src/msg/generated/proto/msgpb"
"github.com/m3db/m3/src/msg/protocol/proto"
"github.com/m3db/m3/src/x/clock"
xio "github.com/m3db/m3/src/x/io"

"github.com/uber-go/tally"
Expand Down Expand Up @@ -125,7 +126,7 @@ func newConsumer(
decoder: proto.NewDecoder(
conn, opts.DecoderOptions(), opts.ConnectionReadBufferSize(),
),
w: writerFn(conn, wOpts),
w: writerFn(newConnWithTimeout(conn, opts.ConnectionWriteTimeout(), time.Now), wOpts),
conn: conn,
closed: false,
doneCh: make(chan struct{}),
Expand Down Expand Up @@ -263,3 +264,25 @@ func resetProto(m *msgpb.Message) {
m.Metadata.Shard = 0
m.Value = m.Value[:0]
}

type connWithTimeout struct {
net.Conn

timeout time.Duration
nowFn clock.NowFn
}

func newConnWithTimeout(conn net.Conn, timeout time.Duration, nowFn clock.NowFn) connWithTimeout {
return connWithTimeout{
Conn: conn,
timeout: timeout,
nowFn: nowFn,
}
}

func (conn connWithTimeout) Write(p []byte) (int, error) {
if conn.timeout > 0 {
conn.SetWriteDeadline(conn.nowFn().Add(conn.timeout))
}
return conn.Conn.Write(p)
}
14 changes: 14 additions & 0 deletions src/msg/consumer/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ var (
defaultAckBufferSize = 1048576
defaultAckFlushInterval = 200 * time.Millisecond
defaultConnectionBufferSize = 1048576
// TODO(ryanhall07): set this to 5s once we verify this works.
defaultWriteTimeout = 0 * time.Second
)

type options struct {
Expand All @@ -43,6 +45,7 @@ type options struct {
ackBufferSize int
writeBufferSize int
readBufferSize int
writeTimeout time.Duration
iOpts instrument.Options
rwOpts xio.Options
}
Expand All @@ -57,6 +60,7 @@ func NewOptions() Options {
ackBufferSize: defaultAckBufferSize,
writeBufferSize: defaultConnectionBufferSize,
readBufferSize: defaultConnectionBufferSize,
writeTimeout: defaultWriteTimeout,
iOpts: instrument.NewOptions(),
rwOpts: xio.NewOptions(),
}
Expand Down Expand Up @@ -132,6 +136,16 @@ func (opts *options) SetConnectionReadBufferSize(value int) Options {
return &o
}

func (opts *options) ConnectionWriteTimeout() time.Duration {
return opts.writeTimeout
}

func (opts *options) SetConnectionWriteTimeout(value time.Duration) Options {
o := *opts
o.writeTimeout = value
return &o
}

func (opts *options) InstrumentOptions() instrument.Options {
return opts.iOpts
}
Expand Down
6 changes: 6 additions & 0 deletions src/msg/consumer/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,12 @@ type Options interface {
// SetConnectionWriteBufferSize sets the buffer size.
SetConnectionReadBufferSize(value int) Options

// ConnectionWriteTimeout returns the timeout for writing to the connection.
ConnectionWriteTimeout() time.Duration

// SetConnectionWriteTimeout sets the write timeout for the connection.
SetConnectionWriteTimeout(value time.Duration) Options

// InstrumentOptions returns the instrument options.
InstrumentOptions() instrument.Options

Expand Down

0 comments on commit 3c81774

Please sign in to comment.