From d11b5df8875d8b2c4f1fe6a1e51d60e14977d625 Mon Sep 17 00:00:00 2001 From: Andy Date: Sat, 17 Jul 2021 02:20:06 +0800 Subject: [PATCH] Fix connecting to Kafka with IPv6 addresses Co-authored-by: andyfwang Co-authored-by: Nicholas Sun --- consumergroup.go | 4 +++- initproducerid_test.go | 5 +++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/consumergroup.go b/consumergroup.go index 92c622c78..00380ce9b 100644 --- a/consumergroup.go +++ b/consumergroup.go @@ -8,6 +8,8 @@ import ( "fmt" "io" "math" + "net" + "strconv" "strings" "sync" "time" @@ -861,7 +863,7 @@ func (cg *ConsumerGroup) coordinator() (coordinator, error) { return nil, err } - address := fmt.Sprintf("%v:%v", out.Coordinator.Host, out.Coordinator.Port) + address := net.JoinHostPort(out.Coordinator.Host, strconv.Itoa(int(out.Coordinator.Port))) return cg.config.connect(cg.config.Dialer, address) } diff --git a/initproducerid_test.go b/initproducerid_test.go index 5f1bb59aa..480175f40 100644 --- a/initproducerid_test.go +++ b/initproducerid_test.go @@ -3,7 +3,8 @@ package kafka import ( "context" "errors" - "fmt" + "net" + "strconv" "testing" "time" @@ -31,7 +32,7 @@ func TestClientInitProducerId(t *testing.T) { } // Now establish a connection with the transaction coordinator - transactionCoordinator := TCP(fmt.Sprintf("%s:%d", respc.Coordinator.Host, respc.Coordinator.Port)) + transactionCoordinator := TCP(net.JoinHostPort(respc.Coordinator.Host, strconv.Itoa(int(respc.Coordinator.Port)))) client, shutdown = newClient(transactionCoordinator) defer shutdown()