Skip to content

Commit

Permalink
Fix connecting to Kafka with IPv6 addresses
Browse files Browse the repository at this point in the history
Co-authored-by: andyfwang <[email protected]>
Co-authored-by: Nicholas Sun <[email protected]>
  • Loading branch information
3 people authored Jul 16, 2021
1 parent d45624f commit d11b5df
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 3 deletions.
4 changes: 3 additions & 1 deletion consumergroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"fmt"
"io"
"math"
"net"
"strconv"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -861,7 +863,7 @@ func (cg *ConsumerGroup) coordinator() (coordinator, error) {
return nil, err
}

address := fmt.Sprintf("%v:%v", out.Coordinator.Host, out.Coordinator.Port)
address := net.JoinHostPort(out.Coordinator.Host, strconv.Itoa(int(out.Coordinator.Port)))
return cg.config.connect(cg.config.Dialer, address)
}

Expand Down
5 changes: 3 additions & 2 deletions initproducerid_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ package kafka
import (
"context"
"errors"
"fmt"
"net"
"strconv"
"testing"
"time"

Expand Down Expand Up @@ -31,7 +32,7 @@ func TestClientInitProducerId(t *testing.T) {
}

// Now establish a connection with the transaction coordinator
transactionCoordinator := TCP(fmt.Sprintf("%s:%d", respc.Coordinator.Host, respc.Coordinator.Port))
transactionCoordinator := TCP(net.JoinHostPort(respc.Coordinator.Host, strconv.Itoa(int(respc.Coordinator.Port))))
client, shutdown = newClient(transactionCoordinator)
defer shutdown()

Expand Down

0 comments on commit d11b5df

Please sign in to comment.