From 244db1f8761ba73dabeda8549edd1a5144f3c03a Mon Sep 17 00:00:00 2001 From: Aaron Zinger Date: Wed, 13 Apr 2022 11:43:05 -0400 Subject: [PATCH] changefeedccl: tolerate nil error message keys in kafkfa sink Probably Sarama was updated recently, because our roachtest for behavior when Kafka is unstable just started failing. The reason looks to be that Sarama can now create ProducerErrors with nil keys and values, and we have an error wrapper that is caught off guard by this. Probably what happened in the test is a dummy message used to prefetch metadata failed due to the Kafka server being down, and then the sink_kafka worker hit a nil pointer panic trying to wrap it. But of course we've lost the error in the panic. Anyway, now we'll just surface the raw error if there's no message key. Release note (bug fix): Fixed a bug that may have caused a panic if a Kafka server being written to by a changefeed failed at the wrong moment. --- pkg/ccl/changefeedccl/sink_kafka.go | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/pkg/ccl/changefeedccl/sink_kafka.go b/pkg/ccl/changefeedccl/sink_kafka.go index e3ba61766038..714b7ed81d72 100644 --- a/pkg/ccl/changefeedccl/sink_kafka.go +++ b/pkg/ccl/changefeedccl/sink_kafka.go @@ -357,9 +357,14 @@ func (s *kafkaSink) workerLoop() { case err := <-s.producer.Errors(): ackMsg, ackError = err.Msg, err.Err if ackError != nil { - ackError = errors.Wrapf(ackError, - "while sending message with key=%s, size=%d", - err.Msg.Key, err.Msg.Key.Length()+err.Msg.Value.Length()) + // Msg should never be nil but we're being defensive around a vendor library. + // Msg.Key is nil for sentinel errors (e.g. producer shutting down) + // and errors sending dummy messages used to prefetch metadata. + if err.Msg != nil && err.Msg.Key != nil && err.Msg.Value != nil { + ackError = errors.Wrapf(ackError, + "while sending message with key=%s, size=%d", + err.Msg.Key, err.Msg.Key.Length()+err.Msg.Value.Length()) + } } }