Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adjust consumer creation to nats-server v2.9.0 #1080

Merged
merged 7 commits into from
Sep 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 70 additions & 78 deletions js.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,19 @@ const (
apiAccountInfo = "INFO"

// apiConsumerCreateT is used to create consumers.
apiConsumerCreateT = "CONSUMER.CREATE.%s"
// it accepts stream name and consumer name.
apiConsumerCreateT = "CONSUMER.CREATE.%s.%s"

// apiConsumerCreateT is used to create consumers.
// it accepts stream name, consumer name and filter subject
apiConsumerCreateWithFilterSubjectT = "CONSUMER.CREATE.%s.%s.%s"

// apiLegacyConsumerCreateT is used to create consumers.
// this is a legacy endpoint to support creating ephemerals before nats-server v2.9.0.
apiLegacyConsumerCreateT = "CONSUMER.CREATE.%s"

// apiDurableCreateT is used to create durable consumers.
// this is a legacy endpoint to support creating durable consumers before nats-server v2.9.0.
apiDurableCreateT = "CONSUMER.DURABLE.CREATE.%s.%s"

// apiConsumerInfoT is used to create consumers.
Expand Down Expand Up @@ -245,6 +255,9 @@ type jsOpts struct {
directGet bool
// For direct get next message
directNextFor string

// featureFlags are used to enable/disable specific JetStream features
featureFlags featureFlags
}

const (
Expand Down Expand Up @@ -284,6 +297,20 @@ func (opt jsOptFn) configureJSContext(opts *jsOpts) error {
return opt(opts)
}

type featureFlags struct {
useDurableConsumerCreate bool
}

// UseLegacyDurableConsumers makes JetStream use the legacy (pre nats-server v2.9.0) subjects for consumer creation.
// If this option is used when creating JetStremContext, $JS.API.CONSUMER.DURABLE.CREATE.<stream>.<consumer> will be used
// to create a consumer with Durable provided, rather than $JS.API.CONSUMER.CREATE.<stream>.<consumer>.
func UseLegacyDurableConsumers() JSOpt {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like a good compromise to me, allows people to opt out of a feature should they be in a situation where server updates isnt possible.

Of course in other projects this is opt in -> opt out -> remove flag over 3 releases. But seems this is the best we can do at present.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main issue with this is that the default client behavior is still to use the new feature. So really they have to change code, so they could also hold upgrading the server or the client. If there's a flag, my guess is it should be opt-in, rather than opt-out.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My preference would be an opt-in also.

To your point "they could also hold upgrading the server"....this is just not the case, our default answer to any support question is "upgrade the server", users might not be in a position to cange code etc, hence why I prefer opt-in.

Copy link
Member

@wallyqs wallyqs Sep 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the way these type of changes are rolled out is to wait for a few releases before making the default if there is no alternative, I like the feature flags approach from the PR, maybe we could have eventually an option for the JetStream context to always use latest features instead or to enable a subset of them.

nc.JetStream(nats.UseLatestJSFeatures()) // all opt-in
nc.JetStream(nats.UseLegacyDurableConsumers()) // opt-out 
nc.JetStream(&nats.JetStreamFeatures{LegacyDurableConsumers: true, AllowDirect: true})

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UseLatestJSFeatures() would be a nice addition in the future for sure. I would probably lean towards merging this PR as is (with having the opt-out option if needed, to be deprecated and removed in future releases), not to change the behavior vs other clients right now. If we decide we want to have future features like this opt-in, we need to be consistent across all clients. As some of the clients are already released without a feature flag, I don't think it's good to have this discrepancy.

return jsOptFn(func(opts *jsOpts) error {
opts.featureFlags.useDurableConsumerCreate = true
return nil
})
}

// ClientTrace can be used to trace API interactions for the JetStream Context.
type ClientTrace struct {
RequestSent func(subj string, payload []byte)
Expand Down Expand Up @@ -1031,6 +1058,7 @@ func (d nakDelay) configureAck(opts *ackOpts) error {
// ConsumerConfig is the configuration of a JetStream consumer.
type ConsumerConfig struct {
Durable string `json:"durable_name,omitempty"`
Name string `json:"name,omitempty"`
Description string `json:"description,omitempty"`
DeliverPolicy DeliverPolicy `json:"deliver_policy"`
OptStartSeq uint64 `json:"opt_start_seq,omitempty"`
Expand Down Expand Up @@ -1621,95 +1649,59 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,

// If we are creating or updating let's process that request.
if shouldCreate {
j, err := json.Marshal(ccreq)
info, err := js.upsertConsumer(stream, cfg.Durable, ccreq.Config)
if err != nil {
cleanUpSub()
return nil, err
}

var ccSubj string
if isDurable {
ccSubj = js.apiSubj(fmt.Sprintf(apiDurableCreateT, stream, cfg.Durable))
} else {
ccSubj = js.apiSubj(fmt.Sprintf(apiConsumerCreateT, stream))
}

if js.opts.shouldTrace {
ctrace := js.opts.ctrace
if ctrace.RequestSent != nil {
ctrace.RequestSent(ccSubj, j)
}
}
resp, err := nc.Request(ccSubj, j, js.opts.wait)
if err != nil {
cleanUpSub()
if err == ErrNoResponders {
err = ErrJetStreamNotEnabled
var apiErr *APIError
if ok := errors.As(err, &apiErr); !ok {
cleanUpSub()
return nil, err
}
return nil, err
}
if js.opts.shouldTrace {
ctrace := js.opts.ctrace
if ctrace.ResponseReceived != nil {
ctrace.ResponseReceived(ccSubj, resp.Data, resp.Header)
if consumer == _EMPTY_ ||
(apiErr.ErrorCode != JSErrCodeConsumerAlreadyExists && apiErr.ErrorCode != JSErrCodeConsumerNameExists) {
cleanUpSub()
if errors.Is(apiErr, ErrStreamNotFound) {
return nil, ErrStreamNotFound
}
return nil, err
}
}

var cinfo consumerResponse
err = json.Unmarshal(resp.Data, &cinfo)
if err != nil {
cleanUpSub()
return nil, err
}
info = cinfo.ConsumerInfo

if cinfo.Error != nil {
// We will not be using this sub here if we were push based.
if !isPullMode {
cleanUpSub()
}
if consumer != _EMPTY_ &&
(cinfo.Error.ErrorCode == JSErrCodeConsumerAlreadyExists || cinfo.Error.ErrorCode == JSErrCodeConsumerNameExists) {

info, err = js.ConsumerInfo(stream, consumer)
if err != nil {
return nil, err
}
deliver, err = processConsInfo(info, o.cfg, isPullMode, subj, queue)
if err != nil {
return nil, err
}
info, err = js.ConsumerInfo(stream, consumer)
if err != nil {
return nil, err
}
deliver, err = processConsInfo(info, o.cfg, isPullMode, subj, queue)
if err != nil {
return nil, err
}

if !isPullMode {
// We can't reuse the channel, so if one was passed, we need to create a new one.
if isSync {
ch = make(chan *Msg, cap(ch))
} else if ch != nil {
// User provided (ChanSubscription), simply try to drain it.
for done := false; !done; {
select {
case <-ch:
default:
done = true
}
if !isPullMode {
// We can't reuse the channel, so if one was passed, we need to create a new one.
if isSync {
ch = make(chan *Msg, cap(ch))
} else if ch != nil {
// User provided (ChanSubscription), simply try to drain it.
for done := false; !done; {
select {
case <-ch:
default:
done = true
}
}
jsi.deliver = deliver
jsi.hbi = info.Config.Heartbeat

// Recreate the subscription here.
sub, err = nc.subscribe(jsi.deliver, queue, cb, ch, isSync, jsi)
if err != nil {
return nil, err
}
hasFC = info.Config.FlowControl
hasHeartbeats = info.Config.Heartbeat > 0
}
} else {
if errors.Is(cinfo.Error, ErrStreamNotFound) {
return nil, ErrStreamNotFound
jsi.deliver = deliver
jsi.hbi = info.Config.Heartbeat

// Recreate the subscription here.
sub, err = nc.subscribe(jsi.deliver, queue, cb, ch, isSync, jsi)
if err != nil {
return nil, err
}
return nil, cinfo.Error
hasFC = info.Config.FlowControl
hasHeartbeats = info.Config.Heartbeat > 0
}
} else {
// Since the library created the JS consumer, it will delete it on Unsubscribe()/Drain()
Expand Down Expand Up @@ -1960,7 +1952,7 @@ func (sub *Subscription) resetOrderedConsumer(sseq uint64) {
cfg.DeliverPolicy = DeliverByStartSequencePolicy
cfg.OptStartSeq = sseq

ccSubj := fmt.Sprintf(apiConsumerCreateT, jsi.stream)
ccSubj := fmt.Sprintf(apiLegacyConsumerCreateT, jsi.stream)
j, err := json.Marshal(jsi.ccreq)
js := jsi.js
sub.mu.Unlock()
Expand Down
8 changes: 4 additions & 4 deletions jserrors.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ var (
// ErrStreamNotFound is an error returned when stream with given name does not exist.
ErrStreamNotFound JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeStreamNotFound, Description: "stream not found", Code: 404}}

// ErrStreamNameAlreadyInUse is returned when a stream with given name already exists and has a different configuration
// ErrStreamNameAlreadyInUse is returned when a stream with given name already exists and has a different configuration.
ErrStreamNameAlreadyInUse JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeStreamNameInUse, Description: "stream name already in use", Code: 400}}

// ErrConsumerNotFound is an error returned when consumer with given name does not exist.
Expand Down Expand Up @@ -59,7 +59,7 @@ var (
// ErrStreamNameRequired is returned when the provided stream name is empty.
ErrStreamNameRequired JetStreamError = &jsError{message: "stream name is required"}

// ErrConsumerNameRequired is returned when the provided consumer durable name is empty,
// ErrConsumerNameRequired is returned when the provided consumer durable name is empty.
ErrConsumerNameRequired JetStreamError = &jsError{message: "consumer name is required"}

// ErrConsumerConfigRequired is returned when empty consumer consuguration is supplied to add/update consumer.
Expand Down Expand Up @@ -98,8 +98,8 @@ var (
// ErrCantAckIfConsumerAckNone is returned when attempting to ack a message for consumer with AckNone policy set.
ErrCantAckIfConsumerAckNone JetStreamError = &jsError{message: "cannot acknowledge a message for a consumer with AckNone policy"}

// DEPRECATED: ErrInvalidDurableName is no longer returned and will be removed in future releases
// Use ErrInvalidConsumerName instead
// DEPRECATED: ErrInvalidDurableName is no longer returned and will be removed in future releases.
// Use ErrInvalidConsumerName instead.
ErrInvalidDurableName = errors.New("nats: invalid durable name")
)

Expand Down
45 changes: 32 additions & 13 deletions jsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,34 +259,45 @@ type consumerResponse struct {

// AddConsumer will add a JetStream consumer.
func (js *js) AddConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error) {
if cfg != nil && cfg.Durable != _EMPTY_ {
consInfo, err := js.ConsumerInfo(stream, cfg.Durable)
if cfg == nil {
cfg = &ConsumerConfig{}
}
consumerName := cfg.Name
if consumerName == _EMPTY_ {
consumerName = cfg.Durable
}
if consumerName != _EMPTY_ {
consInfo, err := js.ConsumerInfo(stream, consumerName)
if err != nil && !errors.Is(err, ErrConsumerNotFound) && !errors.Is(err, ErrStreamNotFound) {
return nil, err
}

if consInfo != nil {
sameConfig := checkConfig(&consInfo.Config, cfg)
if sameConfig != nil {
return nil, fmt.Errorf("%w: creating consumer %q on stream %q", ErrConsumerNameAlreadyInUse, cfg.Durable, stream)
return nil, fmt.Errorf("%w: creating consumer %q on stream %q", ErrConsumerNameAlreadyInUse, consumerName, stream)
}
}
}

return js.upsertConsumer(stream, cfg, opts...)
return js.upsertConsumer(stream, consumerName, cfg, opts...)
}

func (js *js) UpdateConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error) {
if cfg == nil {
return nil, ErrConsumerConfigRequired
}
if cfg.Durable == _EMPTY_ {
consumerName := cfg.Name
if consumerName == _EMPTY_ {
consumerName = cfg.Durable
}
if consumerName == _EMPTY_ {
return nil, ErrConsumerNameRequired
}
return js.upsertConsumer(stream, cfg, opts...)
return js.upsertConsumer(stream, consumerName, cfg, opts...)
}

func (js *js) upsertConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error) {
func (js *js) upsertConsumer(stream, consumerName string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error) {
if err := checkStreamName(stream); err != nil {
return nil, err
}
Expand All @@ -304,13 +315,21 @@ func (js *js) upsertConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt)
}

var ccSubj string
if cfg != nil && cfg.Durable != _EMPTY_ {
if err := checkConsumerName(cfg.Durable); err != nil {
return nil, err
}
ccSubj = fmt.Sprintf(apiDurableCreateT, stream, cfg.Durable)
if consumerName == _EMPTY_ {
// if consumer name is empty, use the legacy ephemeral endpoint
ccSubj = fmt.Sprintf(apiLegacyConsumerCreateT, stream)
} else if err := checkConsumerName(consumerName); err != nil {
return nil, err
} else if !js.nc.serverMinVersion(2, 9, 0) || (cfg.Durable != "" && js.opts.featureFlags.useDurableConsumerCreate) {
// if server version is lower than 2.9.0 or user set the useDurableConsumerCreate flag, use the legacy DURABLE.CREATE endpoint
ccSubj = fmt.Sprintf(apiDurableCreateT, stream, consumerName)
} else {
ccSubj = fmt.Sprintf(apiConsumerCreateT, stream)
// if above server version 2.9.0, use the endpoints with consumer name
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If not checking the server name, what I think should be done is just in checking if the new config Name is specified. If it is, then that signals the intent to use the new API endpoints. If user specifies Name, but connects to an older server, they will get a timeout: this is not ideal, but I don't think checking server version is reliable anyway (could be connected to one 2.9.0, but server accepting the request be older version, or vice versa).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, I'll change it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, just to clarify - when user provides only Durable, we should use the old endpoints (CONSUMER.DURABLE.CREATE)?
If so, that makes it problematic for e.g. Subscribe("foo", nats.Durable("cons")), as basically it will always use old API subject, unless we add another SubOpt for just setting consumer name.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should then possibly set the Name to the Durable before calling js.AddConsumer(). But then, I agree that we have the situation where this is not a user choice and so connecting to older server would be a problem...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So from what I understand, we have following options:

  1. Not verify the server version compatibility and call the new API if Name is set - problem here is, Subscribe() will always call the old API when combined with Bind()/Durable() options
  2. Do as above, but set Name to the value of Durable in Subscribe() - that means however that we would always be using new API, even for older servers (that's not an option)
  3. Verify server version when choosing the right subject in upsertConsumer() - here, we might have an issue of connecting to an older server, as you described in the original comment.

Looking at those, I would still lean towards option 3 - I could strip version check in AddConsumer()/UpdateConsumer(), but leave the version check when choosing the right subject in upsertConsuper(). What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The pure version orientated approach means users do not get a chance to say they need time to update infrastructure like ACLs and so forth to start using it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, not sure how big of an issue that would be vs forcing the user to intentionally start using the new API by changing their application code (IMO this transition should ideally be seamless, as the user of client library I don't need to care about the API subjects). What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a class of user who care deeply to lock down the APIs according to needs. Also a class of user who have spent considerable time in navigating the API subjects for cross account use via imports and exports.

A major design feature of these API subjects is to enable that lock down. Or to be selectively imported to manage permissions and restrictions.

The Venn diagrams of users likely to pay for NATS and those who care deeply for subject security probsbly has quite a lot of overlap.

So you might not care, but I think we should consider if introducing new features in the most user hostile way possible is perhaps not the right thing - especially considering the users most likely affected.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue with the ACL is possibly real, but in reality, the same logic can be applied the other way. If they deploy the new client but don't update the servers, they can update their ACLs and then deploy the servers.

The client possibly could have a way of rejecting the use of the new API (I do for test purposes). But at some point, the clients become too complex and too flexible.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The question is how do users know? Do we have effective communication channels to tell people about these changes? Release notes and blog post are notoriously ineffective - especially as ours tend to be enormous.

So do we feel users are served well enough by the communication and warning we give them about upcoming changes?

In another world these would be considered breaking changes and tooling and just human behaviour is aware of major changes. These are not just new features. They majorly change existing code simply because it happens to point at another version server.

As much as I loathe the go major version change behaviour this does demonstrate the utility of that.

if cfg.FilterSubject == _EMPTY_ || cfg.FilterSubject == ">" {
ccSubj = fmt.Sprintf(apiConsumerCreateT, stream, consumerName)
} else {
ccSubj = fmt.Sprintf(apiConsumerCreateWithFilterSubjectT, stream, consumerName, cfg.FilterSubject)
}
}

resp, err := js.apiRequestWithContext(o.ctx, js.apiSubj(ccSubj), req)
Expand Down
Loading