diff --git a/eventbus/kafka/eventbus.go b/eventbus/kafka/eventbus.go index 6b8770bb..3e65d9de 100644 --- a/eventbus/kafka/eventbus.go +++ b/eventbus/kafka/eventbus.go @@ -33,7 +33,7 @@ import ( // to all matching registered handlers, in order of registration. type EventBus struct { // TODO: Support multiple brokers. - addr string // comma delimited list of brokers + addresses []string appID string topic string startOffset int64 @@ -49,11 +49,13 @@ type EventBus struct { } // NewEventBus creates an EventBus, with optional GCP connection settings. -func NewEventBus(addr, appID string, options ...Option) (*EventBus, error) { +func NewEventBus(addressList, appID string, options ...Option) (*EventBus, error) { ctx, cancel := context.WithCancel(context.Background()) + addrSplit := strings.Split(addressList, ",") + b := &EventBus{ - addr: addr, + addresses: addrSplit, appID: appID, topic: appID + "_events", startOffset: kafka.LastOffset, // Default: Don't read old messages. @@ -77,7 +79,7 @@ func NewEventBus(addr, appID string, options ...Option) (*EventBus, error) { // Get or create the topic. b.client = &kafka.Client{ - Addr: kafka.TCP(addr), + Addr: kafka.TCP(addrSplit...), } var resp *kafka.CreateTopicsResponse @@ -115,7 +117,7 @@ func NewEventBus(addr, appID string, options ...Option) (*EventBus, error) { } b.writer = &kafka.Writer{ - Addr: kafka.TCP(addr), + Addr: kafka.TCP(addrSplit...), Topic: b.topic, BatchSize: 1, // Write every event to the bus without delay. RequiredAcks: kafka.RequireOne, // Stronger consistency. @@ -219,7 +221,7 @@ func (b *EventBus) AddHandler(ctx context.Context, m eh.EventMatcher, h eh.Event groupID := b.appID + "_" + h.HandlerType().String() r := kafka.NewReader(kafka.ReaderConfig{ - Brokers: strings.Split(b.addr, ","), + Brokers: b.addresses, Topic: b.topic, GroupID: groupID, // Send messages to only one subscriber per group. MaxWait: time.Second, // Allow to exit readloop in max 1s.