Skip to content

Commit

Permalink
Merge pull request #33 from Ccheers/dev
Browse files Browse the repository at this point in the history
feat(msgbus/memory): chan 推入消息 逻辑调优
  • Loading branch information
Ccheers authored Nov 8, 2024
2 parents 38216dc + a4bb845 commit 76175f6
Showing 1 changed file with 43 additions and 5 deletions.
48 changes: 43 additions & 5 deletions xmsgbus/impl/memory/msgbus.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,47 @@ import (
"github.com/ccheers/xpkg/xmsgbus"
)

var (
ErrChanIsFull = fmt.Errorf("channel is full")
)

type msgBusOptions struct {
maxBuffer int
}

func defaultMsgBusOptions() msgBusOptions {
return msgBusOptions{maxBuffer: 32}
}

type IMsgBusOption interface {
apply(*msgBusOptions)
}

type MsgBusOptionFunc func(*msgBusOptions)

func (fn MsgBusOptionFunc) apply(options *msgBusOptions) {
fn(options)
}

func WithMsgBusMaxBufferOption(maxBuffer int) MsgBusOptionFunc {
return func(options *msgBusOptions) {
options.maxBuffer = maxBuffer
}
}

type MsgBus struct {
opts msgBusOptions
mu sync.Mutex
topicSet map[string]map[string]chan []byte
}

func NewMsgBus() xmsgbus.IMsgBus {
func NewMsgBus(options ...IMsgBusOption) xmsgbus.IMsgBus {
opts := defaultMsgBusOptions()
for _, opt := range options {
opt.apply(&opts)
}
return &MsgBus{
opts: opts,
mu: sync.Mutex{},
topicSet: make(map[string]map[string]chan []byte),
}
Expand All @@ -24,14 +58,18 @@ func NewMsgBus() xmsgbus.IMsgBus {
func (x *MsgBus) Push(ctx context.Context, topic string, bs []byte) error {
x.mu.Lock()
defer x.mu.Unlock()
for _, ch := range x.topicSet[topic] {
var fullChans []string
for chanName, ch := range x.topicSet[topic] {
select {
case ch <- bs:
// 满时丢弃
default:
return fmt.Errorf("channel is full")
fullChans = append(fullChans, chanName)
}
}
if len(fullChans) > 0 {
return fmt.Errorf("%w: channels=%+v", ErrChanIsFull, fullChans)
}
return nil
}

Expand All @@ -41,7 +79,7 @@ func (x *MsgBus) Pop(ctx context.Context, topic, channel string, blockTimeout ti
x.topicSet[topic] = make(map[string]chan []byte)
}
if x.topicSet[topic][channel] == nil {
x.topicSet[topic][channel] = make(chan []byte, 32)
x.topicSet[topic][channel] = make(chan []byte, x.opts.maxBuffer)
}
ch := x.topicSet[topic][channel]
x.mu.Unlock()
Expand Down Expand Up @@ -71,7 +109,7 @@ func (x *MsgBus) AddChannel(ctx context.Context, topic string, channel string) e
x.topicSet[topic] = make(map[string]chan []byte)
}
if x.topicSet[topic][channel] == nil {
x.topicSet[topic][channel] = make(chan []byte, 32)
x.topicSet[topic][channel] = make(chan []byte, x.opts.maxBuffer)
}
return nil
}
Expand Down

0 comments on commit 76175f6

Please sign in to comment.