diff --git a/libbeat/outputs/kafka/client.go b/libbeat/outputs/kafka/client.go
index 1780f1392b3..1e110f8ca3e 100644
--- a/libbeat/outputs/kafka/client.go
+++ b/libbeat/outputs/kafka/client.go
@@ -113,7 +113,7 @@ func newKafkaClient(
 	return c, nil
 }
 
-func (c *client) Connect() error {
+func (c *client) Connect(_ context.Context) error {
 	c.mux.Lock()
 	defer c.mux.Unlock()
 
diff --git a/libbeat/outputs/kafka/kafka_integration_test.go b/libbeat/outputs/kafka/kafka_integration_test.go
index e9abc559774..b187b5d57ff 100644
--- a/libbeat/outputs/kafka/kafka_integration_test.go
+++ b/libbeat/outputs/kafka/kafka_integration_test.go
@@ -280,7 +280,7 @@ func TestKafkaPublish(t *testing.T) {
 
 			output, ok := grp.Clients[0].(*client)
 			assert.True(t, ok, "grp.Clients[0] didn't contain a ptr to client")
-			if err := output.Connect(); err != nil {
+			if err := output.Connect(context.Background()); err != nil {
 				t.Fatal(err)
 			}
 			assert.Equal(t, output.index, "testbeat")
diff --git a/libbeat/outputs/redis/backoff.go b/libbeat/outputs/redis/backoff.go
index 2abc1f846f0..42f9db1c285 100644
--- a/libbeat/outputs/redis/backoff.go
+++ b/libbeat/outputs/redis/backoff.go
@@ -19,6 +19,7 @@ package redis
 
 import (
 	"context"
+	"errors"
 	"time"
 
 	"github.com/gomodule/redigo/redis"
@@ -61,7 +62,7 @@ func newBackoffClient(client *client, init, max time.Duration) *backoffClient {
 }
 
 func (b *backoffClient) Connect(ctx context.Context) error {
-	err := b.client.Connect()
+	err := b.client.Connect(ctx)
 	if err != nil {
 		// give the client a chance to promote an internal error to a network error.
 		b.updateFailReason(err)
@@ -102,7 +103,8 @@ func (b *backoffClient) updateFailReason(err error) {
 		return
 	}
 
-	if _, ok := err.(redis.Error); ok {
+	var redisErr *redis.Error
+	if errors.As(err, &redisErr) {
 		b.reason = failRedis
 	} else {
 		b.reason = failOther
diff --git a/libbeat/outputs/redis/client.go b/libbeat/outputs/redis/client.go
index 9f5c9812dd1..db3ec5a3b43 100644
--- a/libbeat/outputs/redis/client.go
+++ b/libbeat/outputs/redis/client.go
@@ -90,7 +90,7 @@ func newClient(
 	}
 }
 
-func (c *client) Connect() error {
+func (c *client) Connect(_ context.Context) error {
 	c.log.Debug("connect")
 	err := c.Client.Connect()
 	if err != nil {
diff --git a/libbeat/tests/integration/kafka_test.go b/libbeat/tests/integration/kafka_test.go
new file mode 100644
index 00000000000..72e5b37e49d
--- /dev/null
+++ b/libbeat/tests/integration/kafka_test.go
@@ -0,0 +1,89 @@
+// Licensed to Elasticsearch B.V. under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Elasticsearch B.V. licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//go:build integration
+
+package integration
+
+import (
+	"fmt"
+	"testing"
+	"time"
+
+	"github.com/Shopify/sarama"
+)
+
+var (
+	// https://github.com/elastic/sarama/blob/c7eabfcee7e5bcd7d0071f0ece4d6bec8c33928a/config_test.go#L14-L17
+	// The version of MockBroker used when this test was written only supports the lowest protocol version by default.
+	// Version incompatibilities will result in message decoding errors between the mock and the beat.
+	kafkaVersion = sarama.MinVersion
+	kafkaTopic   = "test_topic"
+	kafkaCfg     = `
+mockbeat:
+logging:
+  level: debug
+  selectors:
+    - publisher_pipeline_output
+    - kafka
+queue.mem:
+  events: 4096
+  flush.timeout: 0s
+output.kafka:
+  topic: %s
+  version: %s
+  hosts:
+    - %s
+  backoff:
+    init: 0.1s
+    max: 0.2s
+`
+)
+
+// TestKafkaOutputCanConnectAndPublish ensures the beat Kafka output can successfully produce messages to Kafka.
+// Regression test for https://github.com/elastic/beats/issues/41823 where the Kafka output would
+// panic on the first Publish because it's Connect method was no longer called.
+func TestKafkaOutputCanConnectAndPublish(t *testing.T) {
+	// Create a Mock Kafka broker that will listen on localhost on a random unallocated port.
+	// The reference configuration was taken from https://github.com/elastic/sarama/blob/c7eabfcee7e5bcd7d0071f0ece4d6bec8c33928a/async_producer_test.go#L141.
+	leader := sarama.NewMockBroker(t, 1)
+	defer leader.Close()
+
+	// The mock broker must respond to a single metadata request.
+	metadataResponse := new(sarama.MetadataResponse)
+	metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
+	metadataResponse.AddTopicPartition(kafkaTopic, 0, leader.BrokerID(), nil, nil, nil, sarama.ErrNoError)
+	leader.Returns(metadataResponse)
+
+	// The mock broker must return a single produce response. If no produce request is received, the test will fail.
+	// This guarantees that mockbeat successfully produced a message to Kafka and connectivity is established.
+	prodSuccess := new(sarama.ProduceResponse)
+	prodSuccess.AddTopicPartition(kafkaTopic, 0, sarama.ErrNoError)
+	leader.Returns(prodSuccess)
+
+	// Start mockbeat with the appropriate configuration.
+	mockbeat := NewBeat(t, "mockbeat", "../../libbeat.test")
+	mockbeat.WriteConfigFile(fmt.Sprintf(kafkaCfg, kafkaTopic, kafkaVersion, leader.Addr()))
+	mockbeat.Start()
+
+	// Wait for mockbeat to log that it successfully published a batch to Kafka.
+	// This ensures that mockbeat received the expected produce response configured above.
+	mockbeat.WaitForLogs(
+		`finished kafka batch`,
+		10*time.Second,
+		"did not find finished batch log")
+}