Skip to content

Commit

Permalink
fix: channel min size is 1 and change Input interface
Browse files Browse the repository at this point in the history
  • Loading branch information
joway committed Dec 19, 2023
1 parent a07c30a commit 9720685
Show file tree
Hide file tree
Showing 3 changed files with 163 additions and 127 deletions.
133 changes: 67 additions & 66 deletions lang/channel/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@ package channel

import (
"container/list"
"runtime"
"sync"
"sync/atomic"
"time"
)

const (
defaultThrottleWindow = time.Millisecond * 100
defaultSize = 0
defaultMinSize = 1
)

type item struct {
Expand All @@ -45,12 +46,12 @@ type Option func(c *channel)
// Throttle define channel Throttle function
type Throttle func(c Channel) bool

// WithSize define the size of channel.
// WithSize define the size of channel. If channel is full, it will block.
// It conflicts with WithNonBlock option.
func WithSize(size int) Option {
return func(c *channel) {
// with non block mode, no need to change size
if !c.nonblock {
if size >= defaultMinSize && !c.nonblock {
c.size = size
}
}
Expand All @@ -61,7 +62,6 @@ func WithSize(size int) Option {
func WithNonBlock() Option {
return func(c *channel) {
c.nonblock = true
c.size = 1024
}
}

Expand Down Expand Up @@ -146,37 +146,43 @@ func WithRateThrottle(produceRate, consumeRate int) Option {
})
}

var _ Channel = (*channel)(nil)
var (
_ Channel = (*channel)(nil)
)

// Channel is a safe and feature-rich alternative for Go chan struct
type Channel interface {
// Input return a native chan for produce task
Input() chan interface{}
// Output return a native chan for consume task
Output() chan interface{}
// Len return the count of un-consumed tasks
// Input send value to Output channel. If channel is closed, do nothing and will not panic.
Input(v interface{})
// Output return a read-only native chan for consumer.
Output() <-chan interface{}
// Len return the count of un-consumed items.
Len() int
// Stats return the produced and consumed count
// Stats return the produced and consumed count.
Stats() (produced uint64, consumed uint64)
// Close will close the producer and consumer goroutines gracefully
// Close closed the output chan. If channel is not closed explicitly, it will be closed when it's finalized.
Close()
}

// channelWrapper use to detect user never hold the reference of Channel object, and runtime will help to close channel implicitly.
type channelWrapper struct {
Channel
}

// channel implements a safe and feature-rich channel struct for the real world.
type channel struct {
size int
state int32
producer chan interface{}
consumer chan interface{}
nonblock bool // non blocking mode
timeout time.Duration
timeoutCallback func(interface{})
producerThrottle Throttle
consumerThrottle Throttle
throttleWindow time.Duration
// statistics
produced uint64
consumed uint64
// non blocking mode
nonblock bool
produced uint64 // item already been insert into buffer
consumed uint64 // item already been sent into Output chan
// buffer
buffer *list.List // TODO: use high perf queue to reduce GC here
bufferCond *sync.Cond
Expand All @@ -186,27 +192,30 @@ type channel struct {
// New create a new channel.
func New(opts ...Option) Channel {
c := new(channel)
c.size = defaultSize
c.size = defaultMinSize
c.throttleWindow = defaultThrottleWindow
c.bufferCond = sync.NewCond(&c.bufferLock)
for _, opt := range opts {
opt(c)
}
c.producer = make(chan interface{}, c.size)
c.consumer = make(chan interface{})
c.buffer = list.New()
go c.produce()
go c.consume()
return c

// register finalizer for wrapper of channel
cw := &channelWrapper{c}
runtime.SetFinalizer(cw, func(obj *channelWrapper) {
// it's ok to call Close again if user already closed the channel
obj.Close()
})
return cw
}

// Close will close the producer and consumer goroutines gracefully
func (c *channel) Close() {
if !atomic.CompareAndSwapInt32(&c.state, 0, -1) {
return
}
// stop producer
close(c.producer)
// stop consumer
c.bufferLock.Lock()
c.buffer.Init() // clear buffer
Expand All @@ -218,17 +227,44 @@ func (c *channel) isClosed() bool {
return atomic.LoadInt32(&c.state) < 0
}

// Input return a native chan for produce task
func (c *channel) Input() chan interface{} {
return c.producer
func (c *channel) Input(v interface{}) {
if c.isClosed() {
return
}

// prepare item
it := item{value: v}
if c.timeout > 0 {
it.deadline = time.Now().Add(c.timeout)
}

// only check throttle function in blocking mode
if !c.nonblock {
if c.throttling(c.producerThrottle) {
// closed
return
}
}

// enqueue buffer
c.bufferLock.Lock()
if !c.nonblock {
// only check length with blocking mode
for c.buffer.Len() >= c.size {
// wait for consuming
c.bufferCond.Wait()
}
}
c.enqueueBuffer(it)
atomic.AddUint64(&c.produced, 1)
c.bufferLock.Unlock()
c.bufferCond.Signal() // use Signal because only 1 goroutine wait for cond
}

// Output return a native chan for consume task
func (c *channel) Output() chan interface{} {
func (c *channel) Output() <-chan interface{} {
return c.consumer
}

// Len return the count of un-consumed tasks.
func (c *channel) Len() int {
produced, consumed := c.Stats()
l := produced - consumed
Expand All @@ -240,42 +276,7 @@ func (c *channel) Stats() (uint64, uint64) {
return produced, consumed
}

// produce used to process input channel
func (c *channel) produce() {
capacity := c.size
if c.size == 0 {
capacity = 1
}
for p := range c.producer {
// only check throttle function in blocking mode
if !c.nonblock {
if c.throttling(c.producerThrottle) {
// closed
return
}
}

// produced
atomic.AddUint64(&c.produced, 1)
// prepare item
it := item{value: p}
if c.timeout > 0 {
it.deadline = time.Now().Add(c.timeout)
}
// enqueue buffer
c.bufferLock.Lock()
c.enqueueBuffer(it)
c.bufferCond.Signal()
if !c.nonblock {
for c.buffer.Len() >= capacity && !c.isClosed() {
c.bufferCond.Wait()
}
}
c.bufferLock.Unlock()
}
}

// consume used to process output channel
// consume used to process input buffer
func (c *channel) consume() {
for {
// check throttle
Expand All @@ -297,7 +298,7 @@ func (c *channel) consume() {
}
it, ok := c.dequeueBuffer()
c.bufferLock.Unlock()
c.bufferCond.Signal()
c.bufferCond.Broadcast() // use Broadcast because there will be more than 1 goroutines wait for cond
if !ok {
// in fact, this case will never happen
continue
Expand Down
2 changes: 1 addition & 1 deletion lang/channel/channel_example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type response struct {
var taskPool Channel

func Service1(req *request) {
taskPool.Input() <- req // async run
taskPool.Input(req)
return
}

Expand Down
Loading

0 comments on commit 9720685

Please sign in to comment.