diff --git a/CHANGELOG.md b/CHANGELOG.md index d3d758058..4a418674f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,5 +19,6 @@ * [ENHANCEMENT] Add spanlogger package. #42 * [ENHANCEMENT] Add runutil.CloseWithLogOnErr function. #58 * [ENHANCEMENT] Optimise memberlist receive path when used as a backing store for rings with a large number of members. #76 #77 +* [EHHANCEMENT] Memberlist: `-memberlist.packet-write-timeout` config option is now handled as an idle timeout for individual write operations instead of the max time the whole packet transferring can take. #87 * [BUGFIX] spanlogger: Support multiple tenant IDs. #59 * [BUGFIX] Memberlist: fixed corrupted packets when sending compound messages with more than 255 messages or messages bigger than 64KB. #85 diff --git a/kv/memberlist/tcp_transport.go b/kv/memberlist/tcp_transport.go index afbd1b201..7a6065979 100644 --- a/kv/memberlist/tcp_transport.go +++ b/kv/memberlist/tcp_transport.go @@ -66,7 +66,7 @@ func (cfg *TCPTransportConfig) RegisterFlags(f *flag.FlagSet, prefix string) { f.Var(&cfg.BindAddrs, prefix+"memberlist.bind-addr", "IP address to listen on for gossip messages. Multiple addresses may be specified. Defaults to 0.0.0.0") f.IntVar(&cfg.BindPort, prefix+"memberlist.bind-port", 7946, "Port to listen on for gossip messages.") f.DurationVar(&cfg.PacketDialTimeout, prefix+"memberlist.packet-dial-timeout", 5*time.Second, "Timeout used when connecting to other nodes to send packet.") - f.DurationVar(&cfg.PacketWriteTimeout, prefix+"memberlist.packet-write-timeout", 5*time.Second, "Timeout for writing 'packet' data.") + f.DurationVar(&cfg.PacketWriteTimeout, prefix+"memberlist.packet-write-timeout", 5*time.Second, "Idle timeout when writing 'packet' data to other nodes.") f.BoolVar(&cfg.TransportDebug, prefix+"memberlist.transport-debug", false, "Log debug transport messages. Note: global log.level must be at debug level as well.") f.BoolVar(&cfg.TLSEnabled, prefix+"memberlist.tls-enabled", false, "Enable TLS on the memberlist transport layer.") @@ -440,11 +440,7 @@ func (t *TCPTransport) writeTo(b []byte, addr string) error { }() if t.cfg.PacketWriteTimeout > 0 { - deadline := time.Now().Add(t.cfg.PacketWriteTimeout) - err := c.SetDeadline(deadline) - if err != nil { - return fmt.Errorf("setting deadline: %v", err) - } + c = newConnectionWithTimeout(c, t.cfg.PacketWriteTimeout) } buf := bytes.Buffer{} diff --git a/kv/memberlist/util.go b/kv/memberlist/util.go new file mode 100644 index 000000000..b0d8cb117 --- /dev/null +++ b/kv/memberlist/util.go @@ -0,0 +1,38 @@ +package memberlist + +import ( + "net" + "time" + + "github.com/pkg/errors" +) + +type connectionWithTimeout struct { + net.Conn + timeout time.Duration +} + +func newConnectionWithTimeout(conn net.Conn, timeout time.Duration) net.Conn { + return &connectionWithTimeout{ + Conn: conn, + timeout: timeout, + } +} + +// Read implements net.Conn. +func (c *connectionWithTimeout) Read(b []byte) (n int, err error) { + if err := c.Conn.SetDeadline(time.Now().Add(c.timeout)); err != nil { + return 0, errors.Wrap(err, "set deadline") + } + + return c.Conn.Read(b) +} + +// Write implements net.Conn. +func (c *connectionWithTimeout) Write(b []byte) (n int, err error) { + if err := c.Conn.SetDeadline(time.Now().Add(c.timeout)); err != nil { + return 0, errors.Wrap(err, "set deadline") + } + + return c.Conn.Write(b) +}