Skip to content

Commit

Permalink
Fix async pub paf id for new js API
Browse files Browse the repository at this point in the history
Signed-off-by: Tomasz Pietrek <[email protected]>
  • Loading branch information
Jarema committed Nov 28, 2023
1 parent cc3b44e commit 88b4982
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 13 deletions.
21 changes: 18 additions & 3 deletions jetstream/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,11 @@ type (
JetStreamOpt func(*jsOpts) error

jsOpts struct {
publisherOpts asyncPublisherOpts
apiPrefix string
clientTrace *ClientTrace
publisherOpts asyncPublisherOpts
apiPrefix string
replyPrefix string
replyPrefixLen int
clientTrace *ClientTrace
}

// ClientTrace can be used to trace API interactions for the JetStream Context.
Expand Down Expand Up @@ -229,6 +231,7 @@ func New(nc *nats.Conn, opts ...JetStreamOpt) (JetStream, error) {
maxpa: defaultAsyncPubAckInflight,
},
}
setReplyPrefix(nc, &jsOpts)
for _, opt := range opts {
if err := opt(&jsOpts); err != nil {
return nil, err
Expand All @@ -248,6 +251,16 @@ const (
defaultAsyncPubAckInflight = 4000
)

func setReplyPrefix(nc *nats.Conn, jsOpts *jsOpts) {
jsOpts.replyPrefix = nats.InboxPrefix
if nc.Opts.InboxPrefix != "" {
jsOpts.replyPrefix = nc.Opts.InboxPrefix + "."
}
// Add 1 for the dot separator.
jsOpts.replyPrefixLen = len(jsOpts.replyPrefix) + aReplyTokensize + 1

}

// NewWithAPIPrefix returns a new JetStream instance and sets the API prefix to be used in requests to JetStream API
//
// Available options:
Expand All @@ -261,6 +274,7 @@ func NewWithAPIPrefix(nc *nats.Conn, apiPrefix string, opts ...JetStreamOpt) (Je
maxpa: defaultAsyncPubAckInflight,
},
}
setReplyPrefix(nc, &jsOpts)
for _, opt := range opts {
if err := opt(&jsOpts); err != nil {
return nil, err
Expand Down Expand Up @@ -293,6 +307,7 @@ func NewWithDomain(nc *nats.Conn, domain string, opts ...JetStreamOpt) (JetStrea
maxpa: defaultAsyncPubAckInflight,
},
}
setReplyPrefix(nc, &jsOpts)
for _, opt := range opts {
if err := opt(&jsOpts); err != nil {
return nil, err
Expand Down
15 changes: 5 additions & 10 deletions jetstream/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ func (js *jetStream) PublishMsgAsync(m *nats.Msg, opts ...PublishOpt) (PubAckFut
if err != nil {
return nil, fmt.Errorf("nats: error creating async reply handler: %s", err)
}
id = m.Reply[aReplyPreLen:]
id = m.Reply[js.replyPrefixLen:]
paf = &pubAckFuture{msg: m, jsClient: js.publisher, maxRetries: o.retryAttempts, retryWait: o.retryWait}
numPending, maxPending := js.registerPAF(id, paf)

Expand All @@ -282,7 +282,7 @@ func (js *jetStream) PublishMsgAsync(m *nats.Msg, opts ...PublishOpt) (PubAckFut
}
} else {
// when retrying, get the ID from existing reply subject
id = m.Reply[aReplyPreLen:]
id = m.Reply[js.replyPrefixLen:]
}

if err := js.conn.PublishMsg(m); err != nil {
Expand All @@ -295,7 +295,6 @@ func (js *jetStream) PublishMsgAsync(m *nats.Msg, opts ...PublishOpt) (PubAckFut

// For quick token lookup etc.
const (
aReplyPreLen = 14
aReplyTokensize = 6
)

Expand All @@ -309,11 +308,7 @@ func (js *jetStream) newAsyncReply() (string, error) {
for i := 0; i < aReplyTokensize; i++ {
b[i] = rdigits[int(b[i]%base)]
}
inboxPrefix := "_INBOX"
if js.conn.Opts.InboxPrefix != "" {
inboxPrefix = js.conn.Opts.InboxPrefix
}
js.publisher.replyPrefix = fmt.Sprintf("%s.%s.", inboxPrefix, b[:aReplyTokensize])
js.publisher.replyPrefix = fmt.Sprintf("%s%s.", js.replyPrefix, b[:aReplyTokensize])
sub, err := js.conn.Subscribe(fmt.Sprintf("%s*", js.publisher.replyPrefix), js.handleAsyncReply)
if err != nil {
js.publisher.Unlock()
Expand Down Expand Up @@ -341,10 +336,10 @@ func (js *jetStream) newAsyncReply() (string, error) {

// Handle an async reply from PublishAsync.
func (js *jetStream) handleAsyncReply(m *nats.Msg) {
if len(m.Subject) <= aReplyPreLen {
if len(m.Subject) <= js.replyPrefixLen {
return
}
id := m.Subject[aReplyPreLen:]
id := m.Subject[js.replyPrefixLen:]

js.publisher.Lock()

Expand Down

0 comments on commit 88b4982

Please sign in to comment.