Skip to content

Commit

Permalink
Merge #79897
Browse files Browse the repository at this point in the history
79897: changefeedccl: tolerate nil error message keys in kafkfa sink r=[miretskiy] a=HonoreDB

Probably Sarama was updated recently, because our roachtest
for behavior when Kafka is unstable just started failing.
The reason looks to be that Sarama can now create
ProducerErrors with nil keys and values, and we have an
error wrapper that is caught off guard by this.

Probably what happened in the test is a dummy message
used to prefetch metadata failed due to the Kafka server
being down, and then the sink_kafka worker hit a nil pointer
panic trying to wrap it. But of course we've lost the error
in the panic.

Anyway, now we'll just surface the raw error if there's
no message key.

Release note (bug fix): Fixed a bug that may have caused a panic if a Kafka server being written to by a changefeed failed at the wrong moment.

Co-authored-by: Aaron Zinger <[email protected]>
  • Loading branch information
craig[bot] and HonoreDB committed Apr 13, 2022
2 parents c98f9dd + 244db1f commit a43afc6
Showing 1 changed file with 8 additions and 3 deletions.
11 changes: 8 additions & 3 deletions pkg/ccl/changefeedccl/sink_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,9 +357,14 @@ func (s *kafkaSink) workerLoop() {
case err := <-s.producer.Errors():
ackMsg, ackError = err.Msg, err.Err
if ackError != nil {
ackError = errors.Wrapf(ackError,
"while sending message with key=%s, size=%d",
err.Msg.Key, err.Msg.Key.Length()+err.Msg.Value.Length())
// Msg should never be nil but we're being defensive around a vendor library.
// Msg.Key is nil for sentinel errors (e.g. producer shutting down)
// and errors sending dummy messages used to prefetch metadata.
if err.Msg != nil && err.Msg.Key != nil && err.Msg.Value != nil {
ackError = errors.Wrapf(ackError,
"while sending message with key=%s, size=%d",
err.Msg.Key, err.Msg.Key.Length()+err.Msg.Value.Length())
}
}
}

Expand Down

0 comments on commit a43afc6

Please sign in to comment.