Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow direct get by subject to be all subject based. #3325

Merged
merged 1 commit into from
Aug 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions server/jetstream_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,11 @@ const (
JSDirectMsgGet = "$JS.API.DIRECT.GET.*"
JSDirectMsgGetT = "$JS.API.DIRECT.GET.%s"

// This is a direct version of get last by subject, which will be the dominant pattern for KV access once 2.9 is released.
// The stream and the key will be part of the subject to allow for no-marshal payloads and subject based security permissions.
JSDirectGetLastBySubject = "$JS.API.DIRECT.GET.*.>"
JSDirectGetLastBySubjectT = "$JS.API.DIRECT.GET.%s.%s"

// jsDirectGetPre
jsDirectGetPre = "$JS.API.DIRECT.GET"

Expand Down
103 changes: 103 additions & 0 deletions server/jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18967,3 +18967,106 @@ func TestJetStreamMirrorUpdatesNotSupported(t *testing.T) {
t.Fatalf("Expected error %q, got %q", NewJSStreamMirrorNotUpdatableError(), err)
}
}

func TestJetStreamDirectGetBySubject(t *testing.T) {
conf := createConfFile(t, []byte(`
listen: 127.0.0.1:-1
jetstream: {max_mem_store: 64GB, max_file_store: 10TB}

ONLYME = {
publish = { allow = "$JS.API.DIRECT.GET.KV.vid.22.>"}
}

accounts: {
A: {
jetstream: enabled
users: [
{ user: admin, password: s3cr3t },
{ user: user, password: pwd, permissions: $ONLYME},
]
},
}
`))
defer removeFile(t, conf)

s, _ := RunServerWithConfig(conf)
if config := s.JetStreamConfig(); config != nil {
defer removeDir(t, config.StoreDir)
}
defer s.Shutdown()

nc, js := jsClientConnect(t, s, nats.UserInfo("admin", "s3cr3t"))
defer nc.Close()

// Do by hand for now.
cfg := &StreamConfig{
Name: "KV",
Storage: MemoryStorage,
Subjects: []string{"vid.*.>"},
MaxMsgsPer: 1,
AllowDirect: true,
}
addStream(t, nc, cfg)

// Add in mirror as well.
cfg = &StreamConfig{
Name: "M",
Storage: MemoryStorage,
Mirror: &StreamSource{Name: "KV"},
MirrorDirect: true,
}
addStream(t, nc, cfg)

v22 := "vid.22.speed"
v33 := "vid.33.speed"
_, err := js.Publish(v22, []byte("100"))
require_NoError(t, err)
_, err = js.Publish(v33, []byte("55"))
require_NoError(t, err)

// User the restricted user.
nc, _ = jsClientConnect(t, s, nats.UserInfo("user", "pwd"))
defer nc.Close()

errCh := make(chan error, 10)
nc.SetErrorHandler(func(_ *nats.Conn, _ *nats.Subscription, e error) {
select {
case errCh <- e:
default:
}
})

getSubj := fmt.Sprintf(JSDirectGetLastBySubjectT, "KV", v22)
m, err := nc.Request(getSubj, nil, time.Second)
require_NoError(t, err)
require_True(t, string(m.Data) == "100")

// Now attempt to access vid 33 data..
getSubj = fmt.Sprintf(JSDirectGetLastBySubjectT, "KV", v33)
_, err = nc.Request(getSubj, nil, 200*time.Millisecond)
require_Error(t, err) // timeout here.

select {
case e := <-errCh:
if !strings.HasPrefix(e.Error(), "nats: Permissions Violation") {
t.Fatalf("Expected a permissions violation but got %v", e)
}
case <-time.After(time.Second):
t.Fatalf("Expected to get a permissions error, got none")
}

// Now make sure mirrors are doing right thing with new way as well.
var sawMirror bool
getSubj = fmt.Sprintf(JSDirectGetLastBySubjectT, "KV", v22)
for i := 0; i < 100; i++ {
m, err := nc.Request(getSubj, nil, time.Second)
require_NoError(t, err)
if shdr := m.Header.Get(JSStream); shdr == "M" {
sawMirror = true
break
}
}
if !sawMirror {
t.Fatalf("Expected to see the mirror respond at least once")
}
}
130 changes: 108 additions & 22 deletions server/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ type stream struct {

// Direct get subscription.
directSub *subscription
lastBySub *subscription
}

type sourceInfo struct {
Expand All @@ -245,6 +246,7 @@ type sourceInfo struct {
cname string
sub *subscription
dsub *subscription
lbsub *subscription
msgs *ipQueue // of *inMsg
sseq uint64
dseq uint64
Expand Down Expand Up @@ -1433,14 +1435,20 @@ func (mset *stream) updateWithAdvisory(config *StreamConfig, sendAdvisory bool)
}

// Check for mirror. If set but we are not a mirror just ignore for now.
if mset.cfg.MirrorDirect && mset.cfg.Mirror != nil && mset.mirror.dsub == nil {
if mset.cfg.MirrorDirect && mset.cfg.Mirror != nil {
if err := mset.subscribeToMirrorDirect(); err != nil {
// Disable since we had problems above.
mset.cfg.MirrorDirect = false
}
} else if !mset.cfg.MirrorDirect && mset.mirror != nil && mset.mirror.dsub != nil {
mset.unsubscribe(mset.mirror.dsub)
mset.mirror.dsub = nil
} else if !mset.cfg.MirrorDirect && mset.mirror != nil {
if mset.mirror.dsub != nil {
mset.unsubscribe(mset.mirror.dsub)
mset.mirror.dsub = nil
}
if mset.mirror.lbsub != nil {
mset.unsubscribe(mset.mirror.lbsub)
mset.mirror.lbsub = nil
}
}
mset.mu.Unlock()

Expand Down Expand Up @@ -2910,31 +2918,56 @@ func (mset *stream) subscribeToStream() error {

// Lock should be held.
func (mset *stream) subscribeToDirect() error {
if mset.directSub != nil {
return nil
}
dsubj := fmt.Sprintf(JSDirectMsgGetT, mset.cfg.Name)
// We will make this listen on a queue group by default, which can allow mirrors to participate on opt-in basis.
if sub, err := mset.queueSubscribeInternal(dsubj, dgetGroup, mset.processDirectGetRequest); err != nil {
return err
} else {
mset.directSub = sub
if mset.directSub == nil {
dsubj := fmt.Sprintf(JSDirectMsgGetT, mset.cfg.Name)
if sub, err := mset.queueSubscribeInternal(dsubj, dgetGroup, mset.processDirectGetRequest); err == nil {
mset.directSub = sub
} else {
return err
}
}
// Now the one that will have subject appended past stream name.
if mset.lastBySub == nil {
dsubj := fmt.Sprintf(JSDirectGetLastBySubjectT, mset.cfg.Name, fwcs)
// We will make this listen on a queue group by default, which can allow mirrors to participate on opt-in basis.
if sub, err := mset.queueSubscribeInternal(dsubj, dgetGroup, mset.processDirectGetLastBySubjectRequest); err == nil {
mset.lastBySub = sub
} else {
return err
}
}

return nil
}

// Lock should be held.
func (mset *stream) subscribeToMirrorDirect() error {
if mset.mirror == nil || mset.mirror.dsub != nil {
if mset.mirror == nil {
return nil
}
dsubj := fmt.Sprintf(JSDirectMsgGetT, mset.mirror.name)

// We will make this listen on a queue group by default, which can allow mirrors to participate on opt-in basis.
if sub, err := mset.queueSubscribeInternal(dsubj, dgetGroup, mset.processDirectGetRequest); err != nil {
return err
} else {
mset.mirror.dsub = sub
if mset.mirror.dsub == nil {
dsubj := fmt.Sprintf(JSDirectMsgGetT, mset.mirror.name)
// We will make this listen on a queue group by default, which can allow mirrors to participate on opt-in basis.
if sub, err := mset.queueSubscribeInternal(dsubj, dgetGroup, mset.processDirectGetRequest); err == nil {
mset.mirror.dsub = sub
} else {
return err
}
}
// Now the one that will have subject appended past stream name.
if mset.mirror.lbsub == nil {
dsubj := fmt.Sprintf(JSDirectGetLastBySubjectT, mset.mirror.name, fwcs)
// We will make this listen on a queue group by default, which can allow mirrors to participate on opt-in basis.
if sub, err := mset.queueSubscribeInternal(dsubj, dgetGroup, mset.processDirectGetLastBySubjectRequest); err == nil {
mset.mirror.lbsub = sub
} else {
return err
}
}

return nil
}

Expand Down Expand Up @@ -2969,10 +3002,16 @@ func (mset *stream) unsubscribeToStream(stopping bool) error {
mset.stopSourceConsumers()
}

// In case we had a direct get subscription.
if mset.directSub != nil && stopping {
mset.unsubscribe(mset.directSub)
mset.directSub = nil
// In case we had a direct get subscriptions.
if stopping {
if mset.directSub != nil {
mset.unsubscribe(mset.directSub)
mset.directSub = nil
}
if mset.lastBySub != nil {
mset.unsubscribe(mset.lastBySub)
mset.lastBySub = nil
}
}

mset.active = false
Expand Down Expand Up @@ -3333,6 +3372,53 @@ func (mset *stream) processDirectGetRequest(_ *subscription, c *client, _ *Accou
}
}

// This is for direct get by last subject which is part of the subject itself.
func (mset *stream) processDirectGetLastBySubjectRequest(_ *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So what will happen to the other versions of direct get? Meaning as of now, the Go client is using a nats.DirectGet() or nats.DirectGetNext() option with optional sequence number to get a message at a specific sequence number. Will that go away? Since those were added very recently and not yet released, I would like to know if we pull them out, etc..

As for the kv implementation, as I previously said, the KV API has a GetRevision() API that retrieves a key at a given sequence:

	if kv.useDirect {
		_opts[0] = DirectGet()
		opts = _opts[:1]
	}
	if revision == kvLatestRevision {
		m, err = kv.js.GetLastMsg(kv.stream, b.String(), opts...)
	} else {
		m, err = kv.js.GetMsg(kv.stream, revision, opts...)
		// If a sequence was provided, just make sure that the retrieved
		// message subject matches the request.
		if err == nil && m.Subject != b.String() {
			return nil, ErrKeyNotFound
		}
	}

So with the change in the server, I could see how we would modify GetLastMsg() in case of direct to append the subject and not have a payload at all, but what about the other cases and the JSM management get message APIs.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current direct get that takes a request body will continue to work as is today.

I would expect clients to optimize and use this method when they know they are just looking up last value by subject/key as it avoids a marshal and can be subject to security permissions, but the current direct get will continue to work.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, so I read that we should try to optimize the clients GetLastMsg() when nats.DirectGet() option is provided to simply add the subject at the end, and not send a marshal request.

_, msg := c.msgParts(rmsg)
if len(reply) == 0 {
return
}
// This version expects no payload.
if len(msg) != 0 {
hdr := []byte("NATS/1.0 408 Bad Request\r\n\r\n")
mset.outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0))
return
}
// Extract the key.
var key string
for i, n := 0, 0; i < len(subject); i++ {
if subject[i] == btsep {
if n == 4 {
if start := i + 1; start < len(subject) {
key = subject[i+1:]
}
break
}
n++
}
}
if len(key) == 0 {
hdr := []byte("NATS/1.0 408 Bad Request\r\n\r\n")
mset.outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0))
return
}

inlineOk := c.kind != ROUTER && c.kind != GATEWAY && c.kind != LEAF
if !inlineOk {
// Check how long we have been away from the readloop for the route or gateway or leafnode.
// If too long move to a separate go routine.
if elapsed := time.Since(c.in.start); elapsed < noBlockThresh {
inlineOk = true
}
}

if inlineOk {
mset.getDirectRequest(&JSApiMsgGetRequest{LastFor: key}, reply)
} else {
go mset.getDirectRequest(&JSApiMsgGetRequest{LastFor: key}, reply)
}
}

// Do actual work on a direct msg request.
// This could be called in a Go routine if we are inline for a non-client connection.
func (mset *stream) getDirectRequest(req *JSApiMsgGetRequest, reply string) {
Expand Down