Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

js: Add JetStreamError with more API response details #1047

Merged
merged 7 commits into from
Aug 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions js.go
Original file line number Diff line number Diff line change
Expand Up @@ -1693,7 +1693,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
hasHeartbeats = info.Config.Heartbeat > 0
}
} else {
if cinfo.Error.ErrorCode == JSErrCodeStreamNotFound {
if errors.Is(cinfo.Error, ErrStreamNotFound) {
return nil, ErrStreamNotFound
}
return nil, cinfo.Error
Expand Down Expand Up @@ -2772,10 +2772,10 @@ func (js *js) getConsumerInfoContext(ctx context.Context, stream, consumer strin
return nil, err
}
if info.Error != nil {
if info.Error.ErrorCode == JSErrCodeConsumerNotFound {
if errors.Is(info.Error, ErrConsumerNotFound) {
return nil, ErrConsumerNotFound
}
if info.Error.ErrorCode == JSErrCodeStreamNotFound {
if errors.Is(info.Error, ErrStreamNotFound) {
return nil, ErrStreamNotFound
}
return nil, info.Error
Expand Down
185 changes: 185 additions & 0 deletions jserrors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
// Copyright 2020-2022 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package nats

import (
"errors"
"fmt"
)

var (
// API errors

// ErrJetStreamNotEnabled is an error returned when JetStream is not enabled for an account.
ErrJetStreamNotEnabled JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeJetStreamNotEnabled, Description: "jetstream not enabled", Code: 503}}

// ErrJetStreamNotEnabledForAccount is an error returned when JetStream is not enabled for an account.
ErrJetStreamNotEnabledForAccount JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeJetStreamNotEnabledForAccount, Description: "jetstream not enabled for account", Code: 503}}

// ErrStreamNotFound is an error returned when stream with given name does not exist.
ErrStreamNotFound JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeStreamNotFound, Description: "stream not found", Code: 404}}

// ErrStreamNameAlreadyInUse is returned when a stream with given name already exists and has a different configuration
ErrStreamNameAlreadyInUse JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeStreamNameInUse, Description: "stream name already in use", Code: 400}}

// ErrConsumerNotFound is an error returned when consumer with given name does not exist.
ErrConsumerNotFound JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeConsumerNotFound, Description: "consumer not found", Code: 404}}

// ErrMsgNotFound is returned when message with provided sequence number does npt exist.
ErrMsgNotFound JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeMessageNotFound, Description: "message not found", Code: 404}}

// ErrBadRequest is returned when invalid request is sent to JetStream API.
ErrBadRequest JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeBadRequest, Description: "bad request", Code: 400}}

// Client errors

// ErrConsumerNotFound is an error returned when consumer with given name does not exist.
ErrConsumerNameAlreadyInUse JetStreamError = &jsError{message: "consumer name already in use"}

// ErrConsumerNotActive is an error returned when consumer is not active.
ErrConsumerNotActive JetStreamError = &jsError{message: "consumer not active"}

// ErrInvalidJSAck is returned when JetStream ack from message publish is invalid.
ErrInvalidJSAck JetStreamError = &jsError{message: "invalid jetstream publish response"}

// ErrStreamConfigRequired is returned when empty stream configuration is supplied to add/update stream.
ErrStreamConfigRequired JetStreamError = &jsError{message: "stream configuration is required"}

// ErrStreamNameRequired is returned when the provided stream name is empty.
ErrStreamNameRequired JetStreamError = &jsError{message: "stream name is required"}

// ErrConsumerNameRequired is returned when the provided consumer durable name is empty,
ErrConsumerNameRequired JetStreamError = &jsError{message: "consumer name is required"}

// ErrConsumerConfigRequired is returned when empty consumer consuguration is supplied to add/update consumer.
ErrConsumerConfigRequired JetStreamError = &jsError{message: "consumer configuration is required"}

// ErrPullSubscribeToPushConsumer is returned when attempting to use PullSubscribe on push consumer.
ErrPullSubscribeToPushConsumer JetStreamError = &jsError{message: "cannot pull subscribe to push based consumer"}

// ErrPullSubscribeRequired is returned when attempting to use subscribe methods not suitable for pull consumers for pull consumers.
ErrPullSubscribeRequired JetStreamError = &jsError{message: "must use pull subscribe to bind to pull based consumer"}

// ErrMsgAlreadyAckd is returned when attempting to acknowledge message more than once.
ErrMsgAlreadyAckd JetStreamError = &jsError{message: "message was already acknowledged"}

// ErrNoStreamResponse is returned when there is no response from stream (e.g. no responders error).
ErrNoStreamResponse JetStreamError = &jsError{message: "no response from stream"}

// ErrNotJSMessage is returned when attempting to get metadata from non JetStream message .
ErrNotJSMessage JetStreamError = &jsError{message: "not a jetstream message"}

// ErrInvalidStreamName is returned when the provided stream name is invalid (contains '.').
ErrInvalidStreamName JetStreamError = &jsError{message: "invalid stream name"}

// ErrInvalidConsumerName is returned when the provided consumer name is invalid (contains '.').
ErrInvalidConsumerName JetStreamError = &jsError{message: "invalid consumer name"}

// ErrNoMatchingStream is returned when stream lookup by subject is unsuccessful.
ErrNoMatchingStream JetStreamError = &jsError{message: "no stream matches subject"}

// ErrSubjectMismatch is returned when the provided subject does not match consumer's filter subject.
ErrSubjectMismatch JetStreamError = &jsError{message: "subject does not match consumer"}

// ErrContextAndTimeout is returned when attempting to use both context and timeout.
ErrContextAndTimeout JetStreamError = &jsError{message: "context and timeout can not both be set"}

// ErrCantAckIfConsumerAckNone is returned when attempting to ack a message for consumer with AckNone policy set.
ErrCantAckIfConsumerAckNone JetStreamError = &jsError{message: "cannot acknowledge a message for a consumer with AckNone policy"}

// DEPRECATED: ErrInvalidDurableName is no longer returned and will be removed in future releases
// Use ErrInvalidConsumerName instead
ErrInvalidDurableName = errors.New("nats: invalid durable name")
)

// Error code represents JetStream error codes returned by the API
type ErrorCode uint16

const (
JSErrCodeJetStreamNotEnabledForAccount ErrorCode = 10039
JSErrCodeJetStreamNotEnabled ErrorCode = 10076

JSErrCodeStreamNotFound ErrorCode = 10059
JSErrCodeStreamNameInUse ErrorCode = 10058

JSErrCodeConsumerNotFound ErrorCode = 10014
JSErrCodeConsumerNameExists ErrorCode = 10013
JSErrCodeConsumerAlreadyExists ErrorCode = 10105

JSErrCodeMessageNotFound ErrorCode = 10037

JSErrCodeBadRequest ErrorCode = 10003
)

// APIError is included in all API responses if there was an error.
type APIError struct {
Code int `json:"code"`
ErrorCode ErrorCode `json:"err_code"`
Description string `json:"description,omitempty"`
}

// Error prints the JetStream API error code and description
func (e *APIError) Error() string {
return fmt.Sprintf("nats: API error %d: %s", e.ErrorCode, e.Description)
}

// APIError implements the JetStreamError interface.
func (e *APIError) APIError() *APIError {
return e
}

// Is matches against an APIError.
func (e *APIError) Is(err error) bool {
if e == nil {
return false
}
// Extract internal APIError to match against.
var aerr *APIError
ok := errors.As(err, &aerr)
if !ok {
return ok
}
return e.ErrorCode == aerr.ErrorCode
}

// JetStreamError is an error result that happens when using JetStream.
// In case of client-side error, `APIError()` returns nil
type JetStreamError interface {
APIError() *APIError
error
}

type jsError struct {
apiErr *APIError
message string
}

func (err *jsError) APIError() *APIError {
return err.apiErr
}

func (err *jsError) Error() string {
if err.apiErr != nil && err.apiErr.Description != "" {
return err.apiErr.Error()
}
return fmt.Sprintf("nats: %s", err.message)
}

func (err *jsError) Unwrap() error {
Copy link
Member Author

@wallyqs wallyqs Aug 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More info about errors package features of wrapping and Unwrap can be found here:

// Allow matching to embedded APIError in case there is one.
if err.apiErr == nil {
return nil
}
return err.apiErr
}
54 changes: 12 additions & 42 deletions jsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,13 +154,6 @@ type ExternalStream struct {
DeliverPrefix string `json:"deliver"`
}

// APIError is included in all API responses if there was an error.
type APIError struct {
Code int `json:"code"`
ErrorCode ErrorCode `json:"err_code"`
Description string `json:"description,omitempty"`
}

// apiResponse is a standard response from the JetStream JSON API
type apiResponse struct {
Type string `json:"type"`
Expand Down Expand Up @@ -219,27 +212,6 @@ type accountInfoResponse struct {
AccountInfo
}

type ErrorCode uint16

const (
JSErrCodeJetStreamNotEnabledForAccount ErrorCode = 10039
JSErrCodeJetStreamNotEnabled ErrorCode = 10076

JSErrCodeStreamNotFound ErrorCode = 10059
JSErrCodeStreamNameInUse ErrorCode = 10058

JSErrCodeConsumerNotFound ErrorCode = 10014
JSErrCodeConsumerNameExists ErrorCode = 10013
JSErrCodeConsumerAlreadyExists ErrorCode = 10105

JSErrCodeMessageNotFound ErrorCode = 10037
)

// Error prints the JetStream API error code and description
func (e *APIError) Error() string {
return fmt.Sprintf("nats: API error %d: %s", e.ErrorCode, e.Description)
}

// AccountInfo retrieves info about the JetStream usage from the current account.
// If JetStream is not enabled, this will return ErrJetStreamNotEnabled
// Other errors can happen but are generally considered retryable
Expand All @@ -265,12 +237,10 @@ func (js *js) AccountInfo(opts ...JSOpt) (*AccountInfo, error) {
return nil, err
}
if info.Error != nil {
if info.Error.ErrorCode == JSErrCodeJetStreamNotEnabledForAccount {
// Internally checks based on error code instead of description match.
if errors.Is(info.Error, ErrJetStreamNotEnabledForAccount) {
return nil, ErrJetStreamNotEnabledForAccount
}
if info.Error.ErrorCode == JSErrCodeJetStreamNotEnabled {
return nil, ErrJetStreamNotEnabled
}
return nil, info.Error
}

Expand Down Expand Up @@ -356,10 +326,10 @@ func (js *js) upsertConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt)
return nil, err
}
if info.Error != nil {
if info.Error.ErrorCode == JSErrCodeStreamNotFound {
if errors.Is(info.Error, ErrStreamNotFound) {
return nil, ErrStreamNotFound
}
if info.Error.ErrorCode == JSErrCodeConsumerNotFound {
if errors.Is(info.Error, ErrConsumerNotFound) {
return nil, ErrConsumerNotFound
}
return nil, info.Error
Expand Down Expand Up @@ -422,7 +392,7 @@ func (js *js) DeleteConsumer(stream, consumer string, opts ...JSOpt) error {
}

if resp.Error != nil {
if resp.Error.ErrorCode == JSErrCodeConsumerNotFound {
if errors.Is(resp.Error, ErrConsumerNotFound) {
return ErrConsumerNotFound
}
return resp.Error
Expand Down Expand Up @@ -693,7 +663,7 @@ func (js *js) AddStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error) {
return nil, err
}
if resp.Error != nil {
if resp.Error.ErrorCode == JSErrCodeStreamNameInUse {
if errors.Is(resp.Error, ErrStreamNameAlreadyInUse) {
return nil, ErrStreamNameAlreadyInUse
}
return nil, resp.Error
Expand Down Expand Up @@ -741,7 +711,7 @@ func (js *js) StreamInfo(stream string, opts ...JSOpt) (*StreamInfo, error) {
return nil, err
}
if resp.Error != nil {
if resp.Error.ErrorCode == JSErrCodeStreamNotFound {
if errors.Is(resp.Error, ErrStreamNotFound) {
return nil, ErrStreamNotFound
}
return nil, resp.Error
Expand Down Expand Up @@ -833,7 +803,7 @@ func (js *js) UpdateStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error
return nil, err
}
if resp.Error != nil {
if resp.Error.ErrorCode == JSErrCodeStreamNotFound {
if errors.Is(resp.Error, ErrStreamNotFound) {
return nil, ErrStreamNotFound
}
return nil, resp.Error
Expand Down Expand Up @@ -871,7 +841,7 @@ func (js *js) DeleteStream(name string, opts ...JSOpt) error {
}

if resp.Error != nil {
if resp.Error.ErrorCode == JSErrCodeStreamNotFound {
if errors.Is(resp.Error, ErrStreamNotFound) {
return ErrStreamNotFound
}
return resp.Error
Expand Down Expand Up @@ -975,10 +945,10 @@ func (js *js) getMsg(name string, mreq *apiMsgGetRequest, opts ...JSOpt) (*RawSt
return nil, err
}
if resp.Error != nil {
if resp.Error.ErrorCode == JSErrCodeMessageNotFound {
if errors.Is(resp.Error, ErrMsgNotFound) {
return nil, ErrMsgNotFound
}
if resp.Error.ErrorCode == JSErrCodeStreamNotFound {
if errors.Is(resp.Error, ErrStreamNotFound) {
return nil, ErrStreamNotFound
}
return nil, resp.Error
Expand Down Expand Up @@ -1189,7 +1159,7 @@ func (js *js) purgeStream(stream string, req *StreamPurgeRequest, opts ...JSOpt)
return err
}
if resp.Error != nil {
if resp.Error.Code == 400 {
if errors.Is(resp.Error, ErrBadRequest) {
return fmt.Errorf("%w: %s", ErrBadRequest, "invalid purge request body")
}
return resp.Error
Expand Down
Loading