-
Notifications
You must be signed in to change notification settings - Fork 700
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[IMPROVED] Add JetStream error codes, extract ErrConsumerNameAlreadyInUse #1044
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -156,9 +156,9 @@ type ExternalStream struct { | |
|
||
// APIError is included in all API responses if there was an error. | ||
type APIError struct { | ||
Code int `json:"code"` | ||
ErrorCode int `json:"err_code"` | ||
Description string `json:"description,omitempty"` | ||
Code int `json:"code"` | ||
ErrorCode ErrorCode `json:"err_code"` | ||
Description string `json:"description,omitempty"` | ||
} | ||
|
||
// apiResponse is a standard response from the JetStream JSON API | ||
|
@@ -219,6 +219,27 @@ type accountInfoResponse struct { | |
AccountInfo | ||
} | ||
|
||
type ErrorCode uint16 | ||
|
||
const ( | ||
JSErrCodeJetStreamNotEnabledForAccount ErrorCode = 10039 | ||
JSErrCodeJetStreamNotEnabled ErrorCode = 10076 | ||
|
||
JSErrCodeStreamNotFound ErrorCode = 10059 | ||
JSErrCodeStreamNameInUse ErrorCode = 10058 | ||
|
||
JSErrCodeConsumerNotFound ErrorCode = 10014 | ||
JSErrCodeConsumerNameExists ErrorCode = 10013 | ||
JSErrCodeConsumerAlreadyExists ErrorCode = 10105 | ||
|
||
JSErrCodeMessageNotFound ErrorCode = 10037 | ||
) | ||
|
||
// Error prints the JetStream API error code and description | ||
func (e *APIError) Error() string { | ||
return fmt.Sprintf("nats: API error %d: %s", e.ErrorCode, e.Description) | ||
} | ||
|
||
// AccountInfo retrieves info about the JetStream usage from the current account. | ||
// If JetStream is not enabled, this will return ErrJetStreamNotEnabled | ||
// Other errors can happen but are generally considered retryable | ||
|
@@ -244,13 +265,13 @@ func (js *js) AccountInfo(opts ...JSOpt) (*AccountInfo, error) { | |
return nil, err | ||
} | ||
if info.Error != nil { | ||
var err error | ||
if strings.Contains(info.Error.Description, "not enabled for") { | ||
err = ErrJetStreamNotEnabled | ||
} else { | ||
err = errors.New(info.Error.Description) | ||
if info.Error.ErrorCode == JSErrCodeJetStreamNotEnabledForAccount { | ||
return nil, ErrJetStreamNotEnabledForAccount | ||
} | ||
return nil, err | ||
if info.Error.ErrorCode == JSErrCodeJetStreamNotEnabled { | ||
return nil, ErrJetStreamNotEnabled | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In the server not enabled for account and not enabled looks like they are two different errors actually... maybe we should start distinguishing them too and use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Another factor why maybe we should clean it up is that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, distinguishing these errors is a good idea. |
||
} | ||
return nil, info.Error | ||
} | ||
|
||
return &info.AccountInfo, nil | ||
|
@@ -268,6 +289,34 @@ type consumerResponse struct { | |
|
||
// AddConsumer will add a JetStream consumer. | ||
func (js *js) AddConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error) { | ||
if cfg != nil && cfg.Durable != _EMPTY_ { | ||
consInfo, err := js.ConsumerInfo(stream, cfg.Durable) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this is good to call Info before, but also changes the behavior slightly since need to give permissions to info in across account cases which they probably would have anyway. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, I would lean towards leaving that, as without that |
||
if err != nil && !errors.Is(err, ErrConsumerNotFound) && !errors.Is(err, ErrStreamNotFound) { | ||
return nil, err | ||
} | ||
|
||
if consInfo != nil { | ||
sameConfig := checkConfig(&consInfo.Config, cfg) | ||
if sameConfig != nil { | ||
return nil, fmt.Errorf("%w: creating consumer %q on stream %q", ErrConsumerNameAlreadyInUse, cfg.Durable, stream) | ||
} | ||
} | ||
} | ||
|
||
return js.upsertConsumer(stream, cfg, opts...) | ||
} | ||
|
||
func (js *js) UpdateConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error) { | ||
if cfg == nil { | ||
return nil, ErrConsumerConfigRequired | ||
} | ||
if cfg.Durable == _EMPTY_ { | ||
return nil, ErrConsumerNameRequired | ||
} | ||
return js.upsertConsumer(stream, cfg, opts...) | ||
} | ||
|
||
func (js *js) upsertConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error) { | ||
if err := checkStreamName(stream); err != nil { | ||
return nil, err | ||
} | ||
|
@@ -286,7 +335,7 @@ func (js *js) AddConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*C | |
|
||
var ccSubj string | ||
if cfg != nil && cfg.Durable != _EMPTY_ { | ||
if err := checkDurName(cfg.Durable); err != nil { | ||
if err := checkConsumerName(cfg.Durable); err != nil { | ||
return nil, err | ||
} | ||
ccSubj = fmt.Sprintf(apiDurableCreateT, stream, cfg.Durable) | ||
|
@@ -307,30 +356,17 @@ func (js *js) AddConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*C | |
return nil, err | ||
} | ||
if info.Error != nil { | ||
if info.Error.ErrorCode == 10059 { | ||
if info.Error.ErrorCode == JSErrCodeStreamNotFound { | ||
return nil, ErrStreamNotFound | ||
} | ||
if info.Error.Code == 404 { | ||
if info.Error.ErrorCode == JSErrCodeConsumerNotFound { | ||
return nil, ErrConsumerNotFound | ||
} | ||
return nil, errors.New(info.Error.Description) | ||
return nil, info.Error | ||
} | ||
return info.ConsumerInfo, nil | ||
} | ||
|
||
func (js *js) UpdateConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error) { | ||
if err := checkStreamName(stream); err != nil { | ||
return nil, err | ||
} | ||
if cfg == nil { | ||
return nil, ErrConsumerConfigRequired | ||
} | ||
if cfg.Durable == _EMPTY_ { | ||
return nil, ErrInvalidDurableName | ||
} | ||
return js.AddConsumer(stream, cfg, opts...) | ||
} | ||
|
||
// consumerDeleteResponse is the response for a Consumer delete request. | ||
type consumerDeleteResponse struct { | ||
apiResponse | ||
|
@@ -347,6 +383,8 @@ func checkStreamName(stream string) error { | |
return nil | ||
} | ||
|
||
// Check that the durable name exists and is valid, that is, that it does not contain any "." | ||
// Returns ErrConsumerNameRequired if consumer name is empty, ErrInvalidConsumerName is invalid, otherwise nil | ||
func checkConsumerName(consumer string) error { | ||
if consumer == _EMPTY_ { | ||
return ErrConsumerNameRequired | ||
|
@@ -384,10 +422,10 @@ func (js *js) DeleteConsumer(stream, consumer string, opts ...JSOpt) error { | |
} | ||
|
||
if resp.Error != nil { | ||
if resp.Error.Code == 404 { | ||
if resp.Error.ErrorCode == JSErrCodeConsumerNotFound { | ||
return ErrConsumerNotFound | ||
} | ||
return errors.New(resp.Error.Description) | ||
return resp.Error | ||
} | ||
return nil | ||
} | ||
|
@@ -474,7 +512,7 @@ func (c *consumerLister) Next() bool { | |
return false | ||
} | ||
if resp.Error != nil { | ||
c.err = errors.New(resp.Error.Description) | ||
c.err = resp.Error | ||
return false | ||
} | ||
|
||
|
@@ -571,7 +609,7 @@ func (c *consumerNamesLister) Next() bool { | |
return false | ||
} | ||
if resp.Error != nil { | ||
c.err = errors.New(resp.Error.Description) | ||
c.err = resp.Error | ||
return false | ||
} | ||
|
||
|
@@ -655,10 +693,10 @@ func (js *js) AddStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error) { | |
return nil, err | ||
} | ||
if resp.Error != nil { | ||
if resp.Error.ErrorCode == 10058 { | ||
if resp.Error.ErrorCode == JSErrCodeStreamNameInUse { | ||
return nil, ErrStreamNameAlreadyInUse | ||
} | ||
return nil, errors.New(resp.Error.Description) | ||
return nil, resp.Error | ||
} | ||
|
||
return resp.StreamInfo, nil | ||
|
@@ -703,10 +741,10 @@ func (js *js) StreamInfo(stream string, opts ...JSOpt) (*StreamInfo, error) { | |
return nil, err | ||
} | ||
if resp.Error != nil { | ||
if resp.Error.Code == 404 { | ||
if resp.Error.ErrorCode == JSErrCodeStreamNotFound { | ||
return nil, ErrStreamNotFound | ||
} | ||
return nil, fmt.Errorf("nats: %s", resp.Error.Description) | ||
return nil, resp.Error | ||
} | ||
|
||
return resp.StreamInfo, nil | ||
|
@@ -795,7 +833,10 @@ func (js *js) UpdateStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error | |
return nil, err | ||
} | ||
if resp.Error != nil { | ||
return nil, errors.New(resp.Error.Description) | ||
if resp.Error.ErrorCode == JSErrCodeStreamNotFound { | ||
return nil, ErrStreamNotFound | ||
} | ||
return nil, resp.Error | ||
} | ||
return resp.StreamInfo, nil | ||
} | ||
|
@@ -830,10 +871,10 @@ func (js *js) DeleteStream(name string, opts ...JSOpt) error { | |
} | ||
|
||
if resp.Error != nil { | ||
if resp.Error.Code == 404 { | ||
if resp.Error.ErrorCode == JSErrCodeStreamNotFound { | ||
return ErrStreamNotFound | ||
} | ||
return errors.New(resp.Error.Description) | ||
return resp.Error | ||
} | ||
return nil | ||
} | ||
|
@@ -934,10 +975,13 @@ func (js *js) getMsg(name string, mreq *apiMsgGetRequest, opts ...JSOpt) (*RawSt | |
return nil, err | ||
} | ||
if resp.Error != nil { | ||
if resp.Error.Code == 404 && strings.Contains(resp.Error.Description, "message") { | ||
if resp.Error.ErrorCode == JSErrCodeMessageNotFound { | ||
return nil, ErrMsgNotFound | ||
} | ||
return nil, fmt.Errorf("nats: %s", resp.Error.Description) | ||
if resp.Error.ErrorCode == JSErrCodeStreamNotFound { | ||
return nil, ErrStreamNotFound | ||
} | ||
return nil, resp.Error | ||
} | ||
|
||
msg := resp.Message | ||
|
@@ -1082,7 +1126,7 @@ func (js *js) deleteMsg(ctx context.Context, stream string, req *msgDeleteReques | |
return err | ||
} | ||
if resp.Error != nil { | ||
return errors.New(resp.Error.Description) | ||
return resp.Error | ||
} | ||
return nil | ||
} | ||
|
@@ -1148,7 +1192,7 @@ func (js *js) purgeStream(stream string, req *StreamPurgeRequest, opts ...JSOpt) | |
if resp.Error.Code == 400 { | ||
return fmt.Errorf("%w: %s", ErrBadRequest, "invalid purge request body") | ||
} | ||
return errors.New(resp.Error.Description) | ||
return resp.Error | ||
} | ||
return nil | ||
} | ||
|
@@ -1215,7 +1259,7 @@ func (s *streamLister) Next() bool { | |
return false | ||
} | ||
if resp.Error != nil { | ||
s.err = errors.New(resp.Error.Description) | ||
s.err = resp.Error | ||
return false | ||
} | ||
|
||
|
@@ -1299,7 +1343,7 @@ func (l *streamNamesLister) Next() bool { | |
return false | ||
} | ||
if resp.Error != nil { | ||
l.err = errors.New(resp.Error.Description) | ||
l.err = resp.Error | ||
return false | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this change behavior to support the ephemerals?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, pull ephemerals are already supported. This change is introduced because
nats.Durable()
should not accept empty consumer name (or elsecheckConsumerName()
will fail)