Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[chore] Polish channelmetrics package #2938

Merged
merged 1 commit into from
Jun 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions pkg/channelmetrics/noopcollector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package channelmetrics

import "time"

// noopCollector is a default implementation of the MetricsCollector interface
// for internal package use only.
type noopCollector struct{}

func (noopCollector) RecordProduceDuration(duration time.Duration) {}
func (noopCollector) RecordConsumeDuration(duration time.Duration) {}
func (noopCollector) RecordChannelLen(size int) {}
func (noopCollector) RecordChannelCap(capacity int) {}
30 changes: 18 additions & 12 deletions pkg/channelmetrics/observablechan.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,17 @@ type ObservableChan[T any] struct {
// distinguish between metrics for different channels by incorporating it into
// the metric names.
func NewObservableChan[T any](ch chan T, metrics MetricsCollector) *ObservableChan[T] {
if metrics == nil {
metrics = noopCollector{}
}
oChan := &ObservableChan[T]{
ch: ch,
metrics: metrics,
}
oChan.RecordChannelCapacity()
// Record the current length of the channel.
// Note: The channel is likely empty, but it may contain items if it was pre-existing.
// Note: The channel is likely empty, but it may contain items if it
// was pre-existing.
oChan.RecordChannelLen()
return oChan
}
Expand All @@ -51,12 +55,13 @@ func (oc *ObservableChan[T]) Close() {
}

// Send sends an item into the channel and records the duration taken to do so.
// It also updates the current size of the channel buffer.
// This method blocks until the item is sent.
// It also updates the current size of the channel buffer. This method blocks
// until the item is sent.
func (oc *ObservableChan[T]) Send(item T) { _ = oc.SendCtx(context.Background(), item) }

// SendCtx sends an item into the channel with context and records the duration taken to do so.
// It also updates the current size of the channel buffer and supports context cancellation.
// SendCtx sends an item into the channel with context and records the duration
// taken to do so. It also updates the current size of the channel buffer and
// supports context cancellation.
func (oc *ObservableChan[T]) SendCtx(ctx context.Context, item T) error {
defer func(start time.Time) {
oc.metrics.RecordProduceDuration(time.Since(start))
Expand All @@ -66,24 +71,25 @@ func (oc *ObservableChan[T]) SendCtx(ctx context.Context, item T) error {
return common.CancellableWrite(ctx, oc.ch, item)
}

// Recv receives an item from the channel and records the duration taken to do so.
// It also updates the current size of the channel buffer.
// This method blocks until an item is available.
// Recv receives an item from the channel and records the duration taken to do
// so. It also updates the current size of the channel buffer. This method
// blocks until an item is available.
func (oc *ObservableChan[T]) Recv() T {
v, _ := oc.RecvCtx(context.Background())
return v
}

// RecvCtx receives an item from the channel with context and records the duration taken to do so.
// It also updates the current size of the channel buffer and supports context cancellation.
// If an error occurs, it logs the error.
// RecvCtx receives an item from the channel with context and records the
// duration taken to do so. It also updates the current size of the channel
// buffer and supports context cancellation. If an error occurs, it logs the
// error.
func (oc *ObservableChan[T]) RecvCtx(ctx context.Context) (T, error) {
defer func(start time.Time) {
oc.metrics.RecordConsumeDuration(time.Since(start))
oc.RecordChannelLen()
}(time.Now())

return common.CancellableRecv(ctx, oc.ch)
return common.CancellableRead(ctx, oc.ch)
}

// RecordChannelCapacity records the capacity of the channel buffer.
Expand Down
19 changes: 19 additions & 0 deletions pkg/channelmetrics/observablechan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,3 +118,22 @@ func TestObservableChan_Close(t *testing.T) {

mockMetrics.AssertExpectations(t)
}

func TestObservableChanClosed(t *testing.T) {
t.Parallel()

ch := make(chan int)
close(ch)
oc := NewObservableChan(ch, nil)

ctx, cancel := context.WithCancel(context.Background())
// Closed channel should return with an error.
v, err := oc.RecvCtx(ctx)
assert.Error(t, err)
assert.Equal(t, 0, v)

// Cancelled context should also return with an error.
cancel()
_, err = oc.RecvCtx(ctx)
assert.Error(t, err)
}
23 changes: 17 additions & 6 deletions pkg/common/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@ package common

import "context"

// ChannelClosedErr indicates that a read was performed from a closed channel.
type ChannelClosedErr struct{}

func (ChannelClosedErr) Error() string { return "channel is closed" }
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️


func IsDone(ctx context.Context) bool {
select {
case <-ctx.Done():
Expand All @@ -13,7 +18,8 @@ func IsDone(ctx context.Context) bool {

// CancellableWrite blocks on writing the item to the channel but can be
// cancelled by the context. If both the context is cancelled and the channel
// write would succeed, either operation will be performed randomly.
// write would succeed, either operation will be performed randomly, however
// priority is given to context cancellation.
func CancellableWrite[T any](ctx context.Context, ch chan<- T, item T) error {
select {
case <-ctx.Done(): // priority to context cancellation
Expand All @@ -28,10 +34,12 @@ func CancellableWrite[T any](ctx context.Context, ch chan<- T, item T) error {
}
}

// CancellableRecv blocks on receiving an item from the channel but can be
// cancelled by the context. If both the context is cancelled and the channel
// read would succeed, either operation will be performed randomly.
func CancellableRecv[T any](ctx context.Context, ch <-chan T) (T, error) {
// CancellableRead blocks on receiving an item from the channel but can be
// cancelled by the context. If the channel is closed, a ChannelClosedErr is
// returned. If both the context is cancelled and the channel read would
// succeed, either operation will be performed randomly, however priority is
// given to context cancellation.
func CancellableRead[T any](ctx context.Context, ch <-chan T) (T, error) {
var zero T // zero value of type T

select {
Expand All @@ -41,7 +49,10 @@ func CancellableRecv[T any](ctx context.Context, ch <-chan T) (T, error) {
select {
case <-ctx.Done():
return zero, ctx.Err()
case item := <-ch:
case item, ok := <-ch:
if !ok {
return item, ChannelClosedErr{}
}
return item, nil
}
}
Expand Down
Loading