From b51603af22f40fee7d0ed84fe37be7c52418061c Mon Sep 17 00:00:00 2001 From: Evan Huus Date: Wed, 30 Sep 2015 11:01:30 -0400 Subject: [PATCH] Make dead brokers die harder When a broker gets an error trying to receive a response (either from the network layer, or from failing to parse the minimal global header), it should just abandon ship and die. Save that error and return it immediately for any further requests we might have made. - The vast majority of the time the connection is going to be hosed anyways, if nothing else by being out-of-sync on correlation IDs (which we don't handle and which doesn't seem particularly urgent). - All of Sarama's built-in callers (producer/consumer/offset-manager) immediately `Close` a broker when they receive one of these errors anyways, so all this does is speed up that in the common case. *If* one of these errors is recoverable, and *if* there is user-space code somewhere which actually tries to recover in one of those cases, then that code would break. This neatly satisfies one of the XXX comments I left in about this issue from way back in 2013. The TODOs about correlation ID matching are still present. --- broker.go | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/broker.go b/broker.go index 6bdea3869..46f06a0f3 100644 --- a/broker.go +++ b/broker.go @@ -344,16 +344,24 @@ func (b *Broker) encode(pe packetEncoder) (err error) { } func (b *Broker) responseReceiver() { + var dead error header := make([]byte, 8) for response := range b.responses { + if dead != nil { + response.errors <- dead + continue + } + err := b.conn.SetReadDeadline(time.Now().Add(b.conf.Net.ReadTimeout)) if err != nil { + dead = err response.errors <- err continue } _, err = io.ReadFull(b.conn, header) if err != nil { + dead = err response.errors <- err continue } @@ -361,23 +369,22 @@ func (b *Broker) responseReceiver() { decodedHeader := responseHeader{} err = decode(header, &decodedHeader) if err != nil { + dead = err response.errors <- err continue } if decodedHeader.correlationID != response.correlationID { // TODO if decoded ID < cur ID, discard until we catch up // TODO if decoded ID > cur ID, save it so when cur ID catches up we have a response - response.errors <- PacketDecodingError{fmt.Sprintf("correlation ID didn't match, wanted %d, got %d", response.correlationID, decodedHeader.correlationID)} + dead = PacketDecodingError{fmt.Sprintf("correlation ID didn't match, wanted %d, got %d", response.correlationID, decodedHeader.correlationID)} + response.errors <- dead continue } buf := make([]byte, decodedHeader.length-4) _, err = io.ReadFull(b.conn, buf) if err != nil { - // XXX: the above ReadFull call inherits the same ReadDeadline set at the top of this loop, so it may - // fail with a timeout error. If this happens, our connection is permanently toast since we will no longer - // be aligned correctly on the stream (we'll be reading garbage Kafka headers from the middle of data). - // Can we/should we fail harder in that case? + dead = err response.errors <- err continue }