Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IMPROVED] Secure consumer create #3409

Merged
merged 1 commit into from
Aug 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 25 additions & 9 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ type ConsumerInfo struct {
}

type ConsumerConfig struct {
// Durable is deprecated. All consumers will have names. picked by clients.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is still deprecated...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could soften if you like..

Durable string `json:"durable_name,omitempty"`
Name string `json:"name,omitempty"`
derekcollison marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of adding a new property to check, why not simply just say all consumers are durable. And that durables can be reaped by the server based on their inactive_threshold. Server can still autoname "ephemerals" when name is empty.

This would simplify the logic on clients, at the expense that durable_name would always be set, and name in ConsumerInfo would duplicate it.

Copy link
Member

@aricart aricart Aug 29, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only other concern then becomes how to reflect the new use of the API. If the consumer specifies a filter in the config, and the server matches the version with the new API, the client could simply use the new API. Otherwise, it can use the old API, allowing security compliance of the $JS.API.CONSUMER.CREATE... API without having to deal with model changes.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Durables could not be reaped, so I prefer to not change that behavior and leave as is for existing clients.

Going forward all consumers will have a name and optional inactive threshold.
Note also that consumer info already had Name, so it now simply matches that for new style.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why omitempty if All consumers will have names?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's good hygiene for older consumers and clients since it does not introduce this on the wire in those situations.

Description string `json:"description,omitempty"`
DeliverPolicy DeliverPolicy `json:"deliver_policy"`
OptStartSeq uint64 `json:"opt_start_seq,omitempty"`
Expand Down Expand Up @@ -353,10 +355,6 @@ func setConsumerConfigDefaults(config *ConsumerConfig, lim *JSLimitOpts, accLim
}
}

func (mset *stream) addConsumer(config *ConsumerConfig) (*consumer, error) {
return mset.addConsumerWithAssignment(config, _EMPTY_, nil, false)
}

// Check the consumer config. If we are recovering don't check filter subjects.
func checkConsumerCfg(
config *ConsumerConfig,
Expand Down Expand Up @@ -530,6 +528,10 @@ func checkConsumerCfg(
return nil
}

func (mset *stream) addConsumer(config *ConsumerConfig) (*consumer, error) {
return mset.addConsumerWithAssignment(config, _EMPTY_, nil, false)
}

func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname string, ca *consumerAssignment, isRecovering bool) (*consumer, error) {
mset.mu.RLock()
s, jsa, tierName, cfg, acc := mset.srv, mset.jsa, mset.tier, mset.cfg, mset.acc
Expand Down Expand Up @@ -589,8 +591,14 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
}

// If this one is durable and already exists, we let that be ok as long as only updating what should be allowed.
var cName string
if isDurableConsumer(config) {
if eo, ok := mset.consumers[config.Durable]; ok {
cName = config.Durable
} else if config.Name != _EMPTY_ {
cName = config.Name
}
derekcollison marked this conversation as resolved.
Show resolved Hide resolved
if cName != _EMPTY_ {
if eo, ok := mset.consumers[cName]; ok {
mset.mu.Unlock()
err := eo.updateConfig(config)
if err == nil {
Expand Down Expand Up @@ -674,11 +682,17 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
} else if oname != _EMPTY_ {
o.name = oname
} else {
for {
o.name = createConsumerName()
if _, ok := mset.consumers[o.name]; !ok {
break
if config.Name != _EMPTY_ {
o.name = config.Name
} else {
// Legacy ephemeral auto-generated.
for {
o.name = createConsumerName()
if _, ok := mset.consumers[o.name]; !ok {
break
}
}
config.Name = o.name
}
}
// Create ackMsgs queue now that we have a consumer name
Expand Down Expand Up @@ -768,6 +782,8 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
// Ephemerals will always have inactive thresholds.
// Add in 1 sec of jitter above and beyond the default of 5s.
o.dthresh = JsDeleteWaitTimeDefault + time.Duration(rand.Int63n(1000))*time.Millisecond
// Only stamp config with default sans jitter.
o.cfg.InactiveThreshold = JsDeleteWaitTimeDefault
kozlovic marked this conversation as resolved.
Show resolved Hide resolved
}

if o.isPushMode() {
Expand Down
12 changes: 11 additions & 1 deletion server/errors.json
Original file line number Diff line number Diff line change
Expand Up @@ -1288,5 +1288,15 @@
"help": "",
"url": "",
"deprecates": ""
},
{
"constant": "JSConsumerCreateFilterSubjectMismatchErr",
"code": 400,
"error_code": 10131,
"description": "Consumer create request did not match filtered subject from create subject",
"comment": "",
"help": "",
"url": "",
"deprecates": ""
}
]
]
2 changes: 2 additions & 0 deletions server/filestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1487,6 +1487,8 @@ func TestFileStoreMeta(t *testing.T) {
if err := json.Unmarshal(buf, &oconfig2); err != nil {
t.Fatalf("Error unmarshalling: %v", err)
}
// Since we set name we will get that back now.
oconfig.Name = oname
if !reflect.DeepEqual(oconfig2, oconfig) {
t.Fatalf("Consumer configs not equal, got %+v vs %+v", oconfig2, oconfig)
}
Expand Down
129 changes: 89 additions & 40 deletions server/jetstream_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,14 @@ const (
// jsDirectGetPre
jsDirectGetPre = "$JS.API.DIRECT.GET"

// JSApiConsumerCreate is the endpoint to create ephemeral consumers for streams.
// JSApiConsumerCreate is the endpoint to create consumers for streams.
// This was also the legacy endpoint for ephemeral consumers.
// It now can take consumer name and optional filter subject, which when part of the subject controls access.
// Will return JSON response.
JSApiConsumerCreate = "$JS.API.CONSUMER.CREATE.*"
JSApiConsumerCreateT = "$JS.API.CONSUMER.CREATE.%s"
JSApiConsumerCreate = "$JS.API.CONSUMER.CREATE.*"
JSApiConsumerCreateT = "$JS.API.CONSUMER.CREATE.%s"
JSApiConsumerCreateEx = "$JS.API.CONSUMER.CREATE.*.>"
derekcollison marked this conversation as resolved.
Show resolved Hide resolved
JSApiConsumerCreateExT = "$JS.API.CONSUMER.CREATE.%s.%s.%s"

// JSApiDurableCreate is the endpoint to create durable consumers for streams.
// You need to include the stream and consumer name in the subject.
Expand Down Expand Up @@ -832,8 +836,9 @@ func (s *Server) setJetStreamExportSubs() error {
{JSApiConsumerLeaderStepDown, s.jsConsumerLeaderStepDownRequest},
{JSApiMsgDelete, s.jsMsgDeleteRequest},
{JSApiMsgGet, s.jsMsgGetRequest},
{JSApiConsumerCreateEx, s.jsConsumerCreateRequest},
{JSApiConsumerCreate, s.jsConsumerCreateRequest},
{JSApiDurableCreate, s.jsDurableCreateRequest},
{JSApiDurableCreate, s.jsConsumerCreateRequest},
{JSApiConsumers, s.jsConsumerNamesRequest},
{JSApiConsumerList, s.jsConsumerListRequest},
{JSApiConsumerInfo, s.jsConsumerInfoRequest},
Expand Down Expand Up @@ -3609,17 +3614,19 @@ done:
mset.outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, nil, nil, nil, 0))
}

// Request to create a durable consumer.
func (s *Server) jsDurableCreateRequest(sub *subscription, c *client, acc *Account, subject, reply string, msg []byte) {
s.jsConsumerCreate(sub, c, acc, subject, reply, msg, true)
}
// For determining consumer request type.
type ccReqType uint8

// Request to create a consumer.
func (s *Server) jsConsumerCreateRequest(sub *subscription, c *client, acc *Account, subject, reply string, msg []byte) {
s.jsConsumerCreate(sub, c, acc, subject, reply, msg, false)
}
const (
ccNew = iota
ccLegacyEphemeral
ccLegacyDurable
)

func (s *Server) jsConsumerCreate(sub *subscription, c *client, a *Account, subject, reply string, rmsg []byte, expectDurable bool) {
// Request to create a consumer where stream and optional consumer name are part of the subject, and optional
// filtered subjects can be at the tail end.
// Assumes stream and consumer names are single tokens.
func (s *Server) jsConsumerCreateRequest(sub *subscription, c *client, a *Account, subject, reply string, rmsg []byte) {
if c == nil || !s.JetStreamEnabled() {
return
}
Expand All @@ -3632,13 +3639,6 @@ func (s *Server) jsConsumerCreate(sub *subscription, c *client, a *Account, subj

var resp = JSApiConsumerCreateResponse{ApiResponse: ApiResponse{Type: JSApiConsumerCreateResponseType}}

var streamName string
if expectDurable {
streamName = tokenAt(subject, 6)
} else {
streamName = tokenAt(subject, 5)
}

var req CreateConsumerRequest
if err := json.Unmarshal(msg, &req); err != nil {
resp.Error = NewJSInvalidJSONError()
Expand All @@ -3653,7 +3653,7 @@ func (s *Server) jsConsumerCreate(sub *subscription, c *client, a *Account, subj
if isClustered {
if req.Config.Direct {
// Check to see if we have this stream and are the stream leader.
if !acc.JetStreamIsStreamLeader(streamName) {
if !acc.JetStreamIsStreamLeader(streamNameFromSubject(subject)) {
return
}
} else {
Expand All @@ -3674,21 +3674,64 @@ func (s *Server) jsConsumerCreate(sub *subscription, c *client, a *Account, subj
}
}

var streamName, consumerName, filteredSubject string
var rt ccReqType

if n := numTokens(subject); n < 5 {
s.Warnf(badAPIRequestT, msg)
return
} else if n == 5 {
// Legacy ephemeral.
rt = ccLegacyEphemeral
streamName = streamNameFromSubject(subject)
} else {
// New style and durable legacy.
if tokenAt(subject, 4) == "DURABLE" {
rt = ccLegacyDurable
if n != 7 {
resp.Error = NewJSConsumerDurableNameNotInSubjectError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
streamName = tokenAt(subject, 6)
consumerName = tokenAt(subject, 7)
} else {
streamName = streamNameFromSubject(subject)
consumerName = consumerNameFromSubject(subject)
}
// New has optional filtered subject as part of main subject..
if n > 7 {
tokens := strings.Split(subject, tsep)
filteredSubject = strings.Join(tokens[6:], tsep)
}
}

if hasJS, doErr := acc.checkJetStream(); !hasJS {
if doErr {
resp.Error = NewJSNotEnabledForAccountError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
}
return
}

if streamName != req.Stream {
resp.Error = NewJSStreamMismatchError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}

if expectDurable {
if numTokens(subject) != 7 {
if consumerName != _EMPTY_ {
// Check for path like separators in the name.
if strings.ContainsAny(consumerName, `\/`) {
resp.Error = NewJSConsumerNameContainsPathSeparatorsError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
}

// Should we expect a durable name
if rt == ccLegacyDurable {
if numTokens(subject) < 7 {
resp.Error = NewJSConsumerDurableNameNotInSubjectError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
Expand All @@ -3699,30 +3742,36 @@ func (s *Server) jsConsumerCreate(sub *subscription, c *client, a *Account, subj
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
consumerName := tokenAt(subject, 7)
if consumerName != req.Config.Durable {
resp.Error = NewJSConsumerDurableNameNotMatchSubjectError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
// Check for path like separators in the name.
if strings.ContainsAny(consumerName, `\/`) {
resp.Error = NewJSConsumerNameContainsPathSeparatorsError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}

} else {
if numTokens(subject) != 5 {
resp.Error = NewJSConsumerEphemeralWithDurableInSubjectError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
}
// If new style and durable set make sure they match.
if rt == ccNew {
if req.Config.Durable != _EMPTY_ {
resp.Error = NewJSConsumerEphemeralWithDurableNameError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
if consumerName != req.Config.Durable {
resp.Error = NewJSConsumerDurableNameNotMatchSubjectError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
}
// New style ephemeral so we need to honor the name.
req.Config.Name = consumerName
}
// Check for legacy ephemeral mis-configuration.
if rt == ccLegacyEphemeral && req.Config.Durable != _EMPTY_ {
resp.Error = NewJSConsumerEphemeralWithDurableNameError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}

// Check for a filter subject.
if filteredSubject != _EMPTY_ && req.Config.FilterSubject != filteredSubject {
resp.Error = NewJSConsumerCreateFilterSubjectMismatchError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}

if isClustered && !req.Config.Direct {
Expand Down
14 changes: 14 additions & 0 deletions server/jetstream_errors_generated.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ const (
// JSConsumerCreateErrF General consumer creation failure string ({err})
JSConsumerCreateErrF ErrorIdentifier = 10012

// JSConsumerCreateFilterSubjectMismatchErr Consumer create request did not match filtered subject from create subject
JSConsumerCreateFilterSubjectMismatchErr ErrorIdentifier = 10131

// JSConsumerDeliverCycleErr consumer deliver subject forms a cycle
JSConsumerDeliverCycleErr ErrorIdentifier = 10081

Expand Down Expand Up @@ -411,6 +414,7 @@ var (
JSConsumerBadDurableNameErr: {Code: 400, ErrCode: 10103, Description: "durable name can not contain '.', '*', '>'"},
JSConsumerConfigRequiredErr: {Code: 400, ErrCode: 10078, Description: "consumer config required"},
JSConsumerCreateErrF: {Code: 500, ErrCode: 10012, Description: "{err}"},
JSConsumerCreateFilterSubjectMismatchErr: {Code: 400, ErrCode: 10131, Description: "Consumer create request did not match filtered subject from create subject"},
JSConsumerDeliverCycleErr: {Code: 400, ErrCode: 10081, Description: "consumer deliver subject forms a cycle"},
JSConsumerDeliverToWildcardsErr: {Code: 400, ErrCode: 10079, Description: "consumer deliver subject has wildcards"},
JSConsumerDescriptionTooLongErrF: {Code: 400, ErrCode: 10107, Description: "consumer description is too long, maximum allowed is {max}"},
Expand Down Expand Up @@ -721,6 +725,16 @@ func NewJSConsumerCreateError(err error, opts ...ErrorOption) *ApiError {
}
}

// NewJSConsumerCreateFilterSubjectMismatchError creates a new JSConsumerCreateFilterSubjectMismatchErr error: "Consumer create request did not match filtered subject from create subject"
func NewJSConsumerCreateFilterSubjectMismatchError(opts ...ErrorOption) *ApiError {
eopts := parseOpts(opts)
if ae, ok := eopts.err.(*ApiError); ok {
return ae
}

return ApiErrors[JSConsumerCreateFilterSubjectMismatchErr]
}

// NewJSConsumerDeliverCycleError creates a new JSConsumerDeliverCycleErr error: "consumer deliver subject forms a cycle"
func NewJSConsumerDeliverCycleError(opts ...ErrorOption) *ApiError {
eopts := parseOpts(opts)
Expand Down
Loading