Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] Remove ConsumerInfo() calls in Consume() and Messages() after reconnect. #1643

Merged
merged 2 commits into from
Jun 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions go_test.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,20 @@ go 1.19

require (
github.com/golang/protobuf v1.4.2
github.com/klauspost/compress v1.17.6
github.com/klauspost/compress v1.17.8
github.com/nats-io/jwt v1.2.2
github.com/nats-io/nats-server/v2 v2.10.11
github.com/nats-io/nats-server/v2 v2.10.16
github.com/nats-io/nkeys v0.4.7
github.com/nats-io/nuid v1.0.1
go.uber.org/goleak v1.3.0
golang.org/x/text v0.14.0
golang.org/x/text v0.15.0
google.golang.org/protobuf v1.23.0
)

require (
github.com/minio/highwayhash v1.0.2 // indirect
github.com/nats-io/jwt/v2 v2.5.3 // indirect
golang.org/x/crypto v0.19.0 // indirect
golang.org/x/sys v0.17.0 // indirect
github.com/nats-io/jwt/v2 v2.5.7 // indirect
golang.org/x/crypto v0.23.0 // indirect
golang.org/x/sys v0.20.0 // indirect
golang.org/x/time v0.5.0 // indirect
)
24 changes: 12 additions & 12 deletions go_test.sum
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,16 @@ github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/klauspost/compress v1.17.6 h1:60eq2E/jlfwQXtvZEeBUYADs+BwKBWURIY+Gj2eRGjI=
github.com/klauspost/compress v1.17.6/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM=
github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU=
github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g=
github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/nats-io/jwt v1.2.2 h1:w3GMTO969dFg+UOKTmmyuu7IGdusK+7Ytlt//OYH/uU=
github.com/nats-io/jwt v1.2.2/go.mod h1:/xX356yQA6LuXI9xWW7mZNpxgF2mBmGecH+Fj34sP5Q=
github.com/nats-io/jwt/v2 v2.5.3 h1:/9SWvzc6hTfamcgXJ3uYRpgj+QuY2aLNqRiqrKcrpEo=
github.com/nats-io/jwt/v2 v2.5.3/go.mod h1:iysuPemFcc7p4IoYots3IuELSI4EDe9Y0bQMe+I3Bf4=
github.com/nats-io/nats-server/v2 v2.10.11 h1:yKUiLVincZISpo3A4YljJQ+HfLltGAgoNNJl99KL8I0=
github.com/nats-io/nats-server/v2 v2.10.11/go.mod h1:dXtOqVWzbMTEj+tUyC/itXjJhW37xh0tUBrTAlqAfx8=
github.com/nats-io/jwt/v2 v2.5.7 h1:j5lH1fUXCnJnY8SsQeB/a/z9Azgu2bYIDvtPVNdxe2c=
github.com/nats-io/jwt/v2 v2.5.7/go.mod h1:ZdWS1nZa6WMZfFwwgpEaqBV8EPGVgOTDHN/wTbz0Y5A=
github.com/nats-io/nats-server/v2 v2.10.16 h1:2jXaiydp5oB/nAx/Ytf9fdCi9QN6ItIc9eehX8kwVV0=
github.com/nats-io/nats-server/v2 v2.10.16/go.mod h1:Pksi38H2+6xLe1vQx0/EA4bzetM0NqyIHcIbmgXSkIU=
github.com/nats-io/nkeys v0.2.0/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s=
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=
github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc=
Expand All @@ -31,17 +31,17 @@ go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.19.0 h1:ENy+Az/9Y1vSrlrvBSyna3PITt4tiZLf7sgCjZBX7Wo=
golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU=
golang.org/x/crypto v0.23.0 h1:dIJU/v2J8Mdglj/8rJ6UUOM3Zc9zLZxVZwwxMooUSAI=
golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.17.0 h1:25cE3gD+tdBA7lp7QfhuV+rJiE9YXTcS3VG1SqssI/Y=
golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y=
golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/text v0.15.0 h1:h1V/4gjBv8v9cjcR6+AR5+/cIYK5N/WAgiv4xlsEtAk=
golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk=
golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
Expand Down
195 changes: 156 additions & 39 deletions jetstream/ordered.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type (
stopAfterMsgsLeft chan int
withStopAfter bool
runningFetch *fetchResult
subscription *orderedSubscription
sync.Mutex
}

Expand Down Expand Up @@ -92,7 +93,8 @@ func (c *orderedConsumer) Consume(handler MessageHandler, opts ...PullConsumeOpt
return nil, fmt.Errorf("%w: %s", ErrInvalidOption, err)
}
c.userErrHandler = consumeOpts.ErrHandler
opts = append(opts, ConsumeErrHandler(c.errHandler(c.serial)))
opts = append(opts, consumeReconnectNotify(),
ConsumeErrHandler(c.errHandler(c.serial)))
if consumeOpts.StopAfter > 0 {
c.withStopAfter = true
c.stopAfter = consumeOpts.StopAfter
Expand All @@ -105,6 +107,7 @@ func (c *orderedConsumer) Consume(handler MessageHandler, opts ...PullConsumeOpt
consumer: c,
done: make(chan struct{}, 1),
}
c.subscription = sub
internalHandler := func(serial int) func(msg Msg) {
return func(msg Msg) {
// handler is a noop if message was delivered for a consumer with different serial
Expand Down Expand Up @@ -197,13 +200,13 @@ func (c *orderedConsumer) errHandler(serial int) func(cc ConsumeContext, err err
return func(cc ConsumeContext, err error) {
c.Lock()
defer c.Unlock()
if c.userErrHandler != nil && !errors.Is(err, errOrderedSequenceMismatch) {
if c.userErrHandler != nil && !errors.Is(err, errOrderedSequenceMismatch) && !errors.Is(err, errConnected) {
c.userErrHandler(cc, err)
}
if errors.Is(err, ErrNoHeartbeat) ||
errors.Is(err, errOrderedSequenceMismatch) ||
errors.Is(err, ErrConsumerDeleted) ||
errors.Is(err, ErrConsumerNotFound) {
errors.Is(err, errConnected) {
// only reset if serial matches the current consumer serial and there is no reset in progress
if serial == c.serial && atomic.LoadUint32(&c.resetInProgress) == 0 {
atomic.StoreUint32(&c.resetInProgress, 1)
Expand Down Expand Up @@ -235,7 +238,9 @@ func (c *orderedConsumer) Messages(opts ...PullMessagesOpt) (MessagesContext, er
if err != nil {
return nil, fmt.Errorf("%w: %s", ErrInvalidOption, err)
}
opts = append(opts, WithMessagesErrOnMissingHeartbeat(true))
opts = append(opts,
WithMessagesErrOnMissingHeartbeat(true),
messagesReconnectNotify())
c.stopAfterMsgsLeft = make(chan int, 1)
if consumeOpts.StopAfter > 0 {
c.withStopAfter = true
Expand All @@ -255,6 +260,7 @@ func (c *orderedConsumer) Messages(opts ...PullMessagesOpt) (MessagesContext, er
opts: opts,
done: make(chan struct{}, 1),
}
c.subscription = sub

return sub, nil
}
Expand Down Expand Up @@ -367,6 +373,11 @@ func (c *orderedConsumer) Fetch(batch int, opts ...FetchOpt) (MessageBatch, erro
}
c.currentConsumer.Unlock()
c.consumerType = consumerTypeFetch
sub := orderedSubscription{
consumer: c,
done: make(chan struct{}),
}
c.subscription = &sub
err := c.reset()
if err != nil {
return nil, err
Expand Down Expand Up @@ -397,6 +408,11 @@ func (c *orderedConsumer) FetchBytes(maxBytes int, opts ...FetchOpt) (MessageBat
c.cursor.streamSeq = c.runningFetch.sseq
}
c.consumerType = consumerTypeFetch
sub := orderedSubscription{
consumer: c,
done: make(chan struct{}),
}
c.subscription = &sub
err := c.reset()
if err != nil {
return nil, err
Expand Down Expand Up @@ -425,6 +441,11 @@ func (c *orderedConsumer) FetchNoWait(batch int) (MessageBatch, error) {
return nil, ErrOrderedConsumerConcurrentRequests
}
c.consumerType = consumerTypeFetch
sub := orderedSubscription{
consumer: c,
done: make(chan struct{}),
}
c.subscription = &sub
err := c.reset()
if err != nil {
return nil, err
Expand Down Expand Up @@ -481,52 +502,42 @@ func (c *orderedConsumer) reset() error {
}
consName := c.currentConsumer.CachedInfo().Name
c.currentConsumer.Unlock()
var err error
for i := 0; ; i++ {
if c.cfg.MaxResetAttempts > 0 && i == c.cfg.MaxResetAttempts {
return fmt.Errorf("%w: maximum number of delete attempts reached: %s", ErrOrderedConsumerReset, err)
}
go func() {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
err = c.jetStream.DeleteConsumer(ctx, c.stream, consName)
_ = c.jetStream.DeleteConsumer(ctx, c.stream, consName)
cancel()
if err != nil {
if errors.Is(err, ErrConsumerNotFound) {
break
}
if errors.Is(err, nats.ErrTimeout) || errors.Is(err, context.DeadlineExceeded) {
continue
}
return err
}
break
}
}()
}

c.cursor.deliverSeq = 0
consumerConfig := c.getConsumerConfig()

var err error
var cons Consumer
for i := 0; ; i++ {
if c.cfg.MaxResetAttempts > 0 && i == c.cfg.MaxResetAttempts {
return fmt.Errorf("%w: maximum number of create consumer attempts reached: %s", ErrOrderedConsumerReset, err)

backoffOpts := backoffOpts{
attempts: c.cfg.MaxResetAttempts,
initialInterval: time.Second,
factor: 2,
maxInterval: 10 * time.Second,
cancel: c.subscription.done,
}
err = retryWithBackoff(func(attempt int) (bool, error) {
isClosed := atomic.LoadUint32(&c.subscription.closed) == 1
if isClosed {
return false, nil
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
cons, err = c.jetStream.CreateOrUpdateConsumer(ctx, c.stream, *consumerConfig)
if err != nil {
if errors.Is(err, ErrConsumerNotFound) {
cancel()
break
}
if errors.Is(err, nats.ErrTimeout) || errors.Is(err, context.DeadlineExceeded) {
cancel()
continue
}
cancel()
return err
return true, err
}
cancel()
break
c.currentConsumer = cons.(*pullConsumer)
return false, nil
}, backoffOpts)
if err != nil {
return err
}
c.currentConsumer = cons.(*pullConsumer)
return nil
Expand All @@ -548,6 +559,10 @@ func (c *orderedConsumer) getConsumerConfig() *ConsumerConfig {
// otherwise, start from the next sequence
nextSeq = c.cursor.streamSeq + 1
}

if c.cfg.MaxResetAttempts == 0 {
c.cfg.MaxResetAttempts = -1
}
name := fmt.Sprintf("%s_%d", c.namePrefix, c.serial)
cfg := &ConsumerConfig{
Name: name,
Expand All @@ -564,6 +579,9 @@ func (c *orderedConsumer) getConsumerConfig() *ConsumerConfig {
} else {
cfg.FilterSubjects = c.cfg.FilterSubjects
}
if c.cfg.InactiveThreshold != 0 {
cfg.InactiveThreshold = c.cfg.InactiveThreshold
}

if c.serial != 1 {
return cfg
Expand All @@ -589,9 +607,6 @@ func (c *orderedConsumer) getConsumerConfig() *ConsumerConfig {
cfg.DeliverPolicy = DeliverByStartTimePolicy
cfg.OptStartTime = c.cfg.OptStartTime
}
if c.cfg.InactiveThreshold != 0 {
cfg.InactiveThreshold = c.cfg.InactiveThreshold
}

return cfg
}
Expand All @@ -612,6 +627,20 @@ func messagesStopAfterNotify(numMsgs int, msgsLeftAfterStop chan int) PullMessag
})
}

func consumeReconnectNotify() PullConsumeOpt {
return pullOptFunc(func(opts *consumeOpts) error {
opts.notifyOnReconnect = true
return nil
})
}

func messagesReconnectNotify() PullMessagesOpt {
return pullOptFunc(func(opts *consumeOpts) error {
opts.notifyOnReconnect = true
return nil
})
}

// Info returns information about the ordered consumer.
// Note that this method will fetch the latest instance of the
// consumer from the server, which can be deleted by the library at any time.
Expand Down Expand Up @@ -652,3 +681,91 @@ func (c *orderedConsumer) CachedInfo() *ConsumerInfo {
}
return c.currentConsumer.info
}

type backoffOpts struct {
// total retry attempts
// -1 for unlimited
attempts int
// initial interval after which first retry will be performed
// defaults to 1s
initialInterval time.Duration
// determines whether first function execution should be performed immediately
disableInitialExecution bool
// multiplier on each attempt
// defaults to 2
factor float64
// max interval between retries
// after reaching this value, all subsequent
// retries will be performed with this interval
// defaults to 1 minute
maxInterval time.Duration
// custom backoff intervals
// if set, overrides all other options except attempts
// if attempts are set, then the last interval will be used
// for all subsequent retries after reaching the limit
customBackoff []time.Duration
// cancel channel
// if set, retry will be canceled when this channel is closed
cancel <-chan struct{}
}

func retryWithBackoff(f func(int) (bool, error), opts backoffOpts) error {
var err error
var shouldContinue bool
// if custom backoff is set, use it instead of other options
if len(opts.customBackoff) > 0 {
if opts.attempts != 0 {
return fmt.Errorf("cannot use custom backoff intervals when attempts are set")
}
for i, interval := range opts.customBackoff {
select {
case <-opts.cancel:
return nil
case <-time.After(interval):
}
shouldContinue, err = f(i)
if !shouldContinue {
return err
}
}
return err
}

// set default options
if opts.initialInterval == 0 {
opts.initialInterval = 1 * time.Second
}
if opts.factor == 0 {
opts.factor = 2
}
if opts.maxInterval == 0 {
opts.maxInterval = 1 * time.Minute
}
if opts.attempts == 0 {
return fmt.Errorf("retry attempts have to be set when not using custom backoff intervals")
}
interval := opts.initialInterval
for i := 0; ; i++ {
if i == 0 && opts.disableInitialExecution {
time.Sleep(interval)
continue
}
shouldContinue, err = f(i)
if !shouldContinue {
return err
}
if opts.attempts > 0 && i >= opts.attempts-1 {
break
}
select {
case <-opts.cancel:
return nil
case <-time.After(interval):
}
interval = time.Duration(float64(interval) * opts.factor)
if interval >= opts.maxInterval {
interval = opts.maxInterval
}
}
return err
}
Loading