Skip to content

Commit

Permalink
Merge pull request #388 from JohnRoesler/kafka-addr
Browse files Browse the repository at this point in the history
split the kafka addr right away as it's used a few places inside NewEventBus()
  • Loading branch information
maxekman authored Apr 23, 2022
2 parents ba75821 + 873d243 commit db3d139
Showing 1 changed file with 8 additions and 6 deletions.
14 changes: 8 additions & 6 deletions eventbus/kafka/eventbus.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
// to all matching registered handlers, in order of registration.
type EventBus struct {
// TODO: Support multiple brokers.
addr string // comma delimited list of brokers
addresses []string
appID string
topic string
startOffset int64
Expand All @@ -49,11 +49,13 @@ type EventBus struct {
}

// NewEventBus creates an EventBus, with optional GCP connection settings.
func NewEventBus(addr, appID string, options ...Option) (*EventBus, error) {
func NewEventBus(addressList, appID string, options ...Option) (*EventBus, error) {
ctx, cancel := context.WithCancel(context.Background())

addrSplit := strings.Split(addressList, ",")

b := &EventBus{
addr: addr,
addresses: addrSplit,
appID: appID,
topic: appID + "_events",
startOffset: kafka.LastOffset, // Default: Don't read old messages.
Expand All @@ -77,7 +79,7 @@ func NewEventBus(addr, appID string, options ...Option) (*EventBus, error) {

// Get or create the topic.
b.client = &kafka.Client{
Addr: kafka.TCP(addr),
Addr: kafka.TCP(addrSplit...),
}

var resp *kafka.CreateTopicsResponse
Expand Down Expand Up @@ -115,7 +117,7 @@ func NewEventBus(addr, appID string, options ...Option) (*EventBus, error) {
}

b.writer = &kafka.Writer{
Addr: kafka.TCP(addr),
Addr: kafka.TCP(addrSplit...),
Topic: b.topic,
BatchSize: 1, // Write every event to the bus without delay.
RequiredAcks: kafka.RequireOne, // Stronger consistency.
Expand Down Expand Up @@ -219,7 +221,7 @@ func (b *EventBus) AddHandler(ctx context.Context, m eh.EventMatcher, h eh.Event
groupID := b.appID + "_" + h.HandlerType().String()

r := kafka.NewReader(kafka.ReaderConfig{
Brokers: strings.Split(b.addr, ","),
Brokers: b.addresses,
Topic: b.topic,
GroupID: groupID, // Send messages to only one subscriber per group.
MaxWait: time.Second, // Allow to exit readloop in max 1s.
Expand Down

0 comments on commit db3d139

Please sign in to comment.