Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: use errors.Is for err handling, and use skipped bool #1500

Merged
merged 1 commit into from
Jan 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions jetstream/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ func (js *jetStream) KeyValue(ctx context.Context, bucket string) (KeyValue, err
streamName := fmt.Sprintf(kvBucketNameTmpl, bucket)
stream, err := js.Stream(ctx, streamName)
if err != nil {
if err == ErrStreamNotFound {
if errors.Is(err, ErrStreamNotFound) {
err = ErrBucketNotFound
}
return nil, err
Expand Down Expand Up @@ -565,7 +565,7 @@ func (kv *kvs) get(ctx context.Context, key string, revision uint64) (KeyValueEn
}
}
if err != nil {
if err == ErrMsgNotFound {
if errors.Is(err, ErrMsgNotFound) {
err = ErrKeyNotFound
}
return nil, err
Expand Down Expand Up @@ -617,7 +617,7 @@ func (e *kve) Operation() KeyValueOp { return e.op }
func (kv *kvs) Get(ctx context.Context, key string) (KeyValueEntry, error) {
e, err := kv.get(ctx, key, kvLatestRevision)
if err != nil {
if err == ErrKeyDeleted {
if errors.Is(err, ErrKeyDeleted) {
return nil, ErrKeyNotFound
}
return nil, err
Expand All @@ -630,7 +630,7 @@ func (kv *kvs) Get(ctx context.Context, key string) (KeyValueEntry, error) {
func (kv *kvs) GetRevision(ctx context.Context, key string, revision uint64) (KeyValueEntry, error) {
e, err := kv.get(ctx, key, revision)
if err != nil {
if err == ErrKeyDeleted {
if errors.Is(err, ErrKeyDeleted) {
return nil, ErrKeyNotFound
}
return nil, err
Expand Down Expand Up @@ -675,7 +675,7 @@ func (kv *kvs) Create(ctx context.Context, key string, value []byte) (revision u
return v, nil
}

if e, err := kv.get(ctx, key, kvLatestRevision); err == ErrKeyDeleted {
if e, err := kv.get(ctx, key, kvLatestRevision); errors.Is(err, ErrKeyDeleted) {
return kv.Update(ctx, key, value, e.Revision())
}

Expand Down
20 changes: 10 additions & 10 deletions js.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,7 @@ func (js *js) PublishMsg(m *Msg, opts ...PubOpt) (*PubAck, error) {
}

if err != nil {
for r, ttl := 0, o.ttl; err == ErrNoResponders && (r < o.rnum || o.rnum < 0); r++ {
for r, ttl := 0, o.ttl; errors.Is(err, ErrNoResponders) && (r < o.rnum || o.rnum < 0); r++ {
// To protect against small blips in leadership changes etc, if we get a no responders here retry.
if o.ctx != nil {
select {
Expand All @@ -567,7 +567,7 @@ func (js *js) PublishMsg(m *Msg, opts ...PubOpt) (*PubAck, error) {
}
}
if err != nil {
if err == ErrNoResponders {
if errors.Is(err, ErrNoResponders) {
err = ErrNoStreamResponse
}
return nil, err
Expand Down Expand Up @@ -1601,7 +1601,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
if consumer != _EMPTY_ && !o.skipCInfo {
info, err = js.ConsumerInfo(stream, consumer)
notFoundErr = errors.Is(err, ErrConsumerNotFound)
lookupErr = err == ErrJetStreamNotEnabled || err == ErrTimeout || err == context.DeadlineExceeded
lookupErr = err == ErrJetStreamNotEnabled || errors.Is(err, ErrTimeout) || errors.Is(err, context.DeadlineExceeded)
}

switch {
Expand Down Expand Up @@ -2829,7 +2829,7 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) {
// are no messages.
msg, err = sub.nextMsgWithContext(ctx, true, false)
if err != nil {
if err == errNoMessages {
if errors.Is(err, errNoMessages) {
err = nil
}
break
Expand Down Expand Up @@ -2909,13 +2909,13 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) {
usrMsg, err = checkMsg(msg, true, noWait)
if err == nil && usrMsg {
msgs = append(msgs, msg)
} else if noWait && (err == errNoMessages || err == errRequestsPending) && len(msgs) == 0 {
} else if noWait && (errors.Is(err, errNoMessages) || errors.Is(err, errRequestsPending)) && len(msgs) == 0 {
// If we have a 404/408 for our "no_wait" request and have
// not collected any message, then resend request to
// wait this time.
noWait = false
err = sendReq()
} else if err == ErrTimeout && len(msgs) == 0 {
} else if errors.Is(err, ErrTimeout) && len(msgs) == 0 {
// If we get a 408, we will bail if we already collected some
// messages, otherwise ignore and go back calling nextMsg.
err = nil
Expand Down Expand Up @@ -3098,7 +3098,7 @@ func (sub *Subscription) FetchBatch(batch int, opts ...PullOpt) (MessageBatch, e
// are no messages.
msg, err := sub.nextMsgWithContext(ctx, true, false)
if err != nil {
if err == errNoMessages {
if errors.Is(err, errNoMessages) {
err = nil
}
result.err = err
Expand Down Expand Up @@ -3175,7 +3175,7 @@ func (sub *Subscription) FetchBatch(batch int, opts ...PullOpt) (MessageBatch, e

usrMsg, err = checkMsg(msg, true, false)
if err != nil {
if err == ErrTimeout {
if errors.Is(err, ErrTimeout) {
if reqID != "" && !subjectMatchesReqID(msg.Subject, reqID) {
// ignore timeout message from server if it comes from a different pull request
continue
Expand Down Expand Up @@ -3204,7 +3204,7 @@ func (sub *Subscription) FetchBatch(batch int, opts ...PullOpt) (MessageBatch, e

// checkCtxErr is used to determine whether ErrTimeout should be returned in case of context timeout
func (o *pullOpts) checkCtxErr(err error) error {
if o.ctx == nil && err == context.DeadlineExceeded {
if o.ctx == nil && errors.Is(err, context.DeadlineExceeded) {
return ErrTimeout
}
return err
Expand All @@ -3220,7 +3220,7 @@ func (js *js) getConsumerInfoContext(ctx context.Context, stream, consumer strin
ccInfoSubj := fmt.Sprintf(apiConsumerInfoT, stream, consumer)
resp, err := js.apiRequestWithContext(ctx, js.apiSubj(ccInfoSubj), nil)
if err != nil {
if err == ErrNoResponders {
if errors.Is(err, ErrNoResponders) {
err = ErrJetStreamNotEnabled
}
return nil, err
Expand Down
6 changes: 3 additions & 3 deletions jsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ func (js *js) AccountInfo(opts ...JSOpt) (*AccountInfo, error) {
resp, err := js.apiRequestWithContext(o.ctx, js.apiSubj(apiAccountInfo), nil)
if err != nil {
// todo maybe nats server should never have no responder on this subject and always respond if they know there is no js to be had
if err == ErrNoResponders {
if errors.Is(err, ErrNoResponders) {
err = ErrJetStreamNotEnabled
}
return nil, err
Expand Down Expand Up @@ -415,7 +415,7 @@ func (js *js) upsertConsumer(stream, consumerName string, cfg *ConsumerConfig, o

resp, err := js.apiRequestWithContext(o.ctx, js.apiSubj(ccSubj), req)
if err != nil {
if err == ErrNoResponders {
if errors.Is(err, ErrNoResponders) {
err = ErrJetStreamNotEnabled
}
return nil, err
Expand Down Expand Up @@ -1623,7 +1623,7 @@ func (jsc *js) StreamNameBySubject(subj string, opts ...JSOpt) (string, error) {

resp, err := jsc.apiRequestWithContext(o.ctx, jsc.apiSubj(apiStreams), j)
if err != nil {
if err == ErrNoResponders {
if errors.Is(err, ErrNoResponders) {
err = ErrJetStreamNotEnabled
}
return _EMPTY_, err
Expand Down
12 changes: 6 additions & 6 deletions kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ func (js *js) KeyValue(bucket string) (KeyValue, error) {
stream := fmt.Sprintf(kvBucketNameTmpl, bucket)
si, err := js.StreamInfo(stream)
if err != nil {
if err == ErrStreamNotFound {
if errors.Is(err, ErrStreamNotFound) {
err = ErrBucketNotFound
}
return nil, err
Expand Down Expand Up @@ -486,7 +486,7 @@ func (js *js) CreateKeyValue(cfg *KeyValueConfig) (KeyValue, error) {
// the stream.
// The same logic applies for KVs created pre 2.9.x and
// the AllowDirect setting.
if err == ErrStreamNameAlreadyInUse {
if errors.Is(err, ErrStreamNameAlreadyInUse) {
if si, _ = js.StreamInfo(scfg.Name); si != nil {
// To compare, make the server's stream info discard
// policy same than ours.
Expand Down Expand Up @@ -558,7 +558,7 @@ func keyValid(key string) bool {
func (kv *kvs) Get(key string) (KeyValueEntry, error) {
e, err := kv.get(key, kvLatestRevision)
if err != nil {
if err == ErrKeyDeleted {
if errors.Is(err, ErrKeyDeleted) {
return nil, ErrKeyNotFound
}
return nil, err
Expand All @@ -571,7 +571,7 @@ func (kv *kvs) Get(key string) (KeyValueEntry, error) {
func (kv *kvs) GetRevision(key string, revision uint64) (KeyValueEntry, error) {
e, err := kv.get(key, revision)
if err != nil {
if err == ErrKeyDeleted {
if errors.Is(err, ErrKeyDeleted) {
return nil, ErrKeyNotFound
}
return nil, err
Expand Down Expand Up @@ -608,7 +608,7 @@ func (kv *kvs) get(key string, revision uint64) (KeyValueEntry, error) {
}
}
if err != nil {
if err == ErrMsgNotFound {
if errors.Is(err, ErrMsgNotFound) {
err = ErrKeyNotFound
}
return nil, err
Expand Down Expand Up @@ -675,7 +675,7 @@ func (kv *kvs) Create(key string, value []byte) (revision uint64, err error) {

// TODO(dlc) - Since we have tombstones for DEL ops for watchers, this could be from that
// so we need to double check.
if e, err := kv.get(key, kvLatestRevision); err == ErrKeyDeleted {
if e, err := kv.get(key, kvLatestRevision); errors.Is(err, ErrKeyDeleted) {
return kv.Update(key, value, e.Revision())
}

Expand Down
4 changes: 2 additions & 2 deletions object.go
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,7 @@ func (obs *obs) Get(name string, opts ...GetObjectOpt) (ObjectResult, error) {
if ctx != nil {
select {
case <-ctx.Done():
if ctx.Err() == context.Canceled {
if errors.Is(ctx.Err(), context.Canceled) {
err = ctx.Err()
} else {
err = ErrTimeout
Expand Down Expand Up @@ -926,7 +926,7 @@ func (obs *obs) GetInfo(name string, opts ...GetObjectInfoOpt) (*ObjectInfo, err

m, err := obs.js.GetLastMsg(stream, metaSubj)
if err != nil {
if err == ErrMsgNotFound {
if errors.Is(err, ErrMsgNotFound) {
err = ErrObjectNotFound
}
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion timer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type timerPool struct {

// Get returns a timer that completes after the given duration.
func (tp *timerPool) Get(d time.Duration) *time.Timer {
if t, _ := tp.p.Get().(*time.Timer); t != nil {
if t, ok := tp.p.Get().(*time.Timer); ok && t != nil {
t.Reset(d)
return t
}
Expand Down