diff --git a/server/client.go b/server/client.go index e98c605470..b44745c497 100644 --- a/server/client.go +++ b/server/client.go @@ -4375,6 +4375,12 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver, continue } + // If we are a spoke leaf node make sure to not forward across routes. + // This mimics same behavior for normal subs above. + if c.kind == LEAF && c.isSpokeLeafNode() && sub.client.kind == ROUTER { + continue + } + // We have taken care of preferring local subs for a message from a route above. // Here we just care about a client or leaf and skipping a leaf and preferring locals. if dst := sub.client.kind; dst == ROUTER || dst == LEAF { diff --git a/server/leafnode_test.go b/server/leafnode_test.go index e6577fd373..f29cea5e58 100644 --- a/server/leafnode_test.go +++ b/server/leafnode_test.go @@ -5561,3 +5561,78 @@ func TestLeafNodeWithWeightedDQResponsesWithStreamImportAccountsWithUnsub(t *tes closeSubs(rsubs) checkFor(t, time.Second, 200*time.Millisecond, checkInterest) } + +// https://github.com/nats-io/nats-server/issues/4367 +func TestLeafNodeDQMultiAccountExportImport(t *testing.T) { + bConf := createConfFile(t, []byte(` + listen: 127.0.0.1:-1 + server_name: cluster-b-0 + accounts { + $SYS: { users: [ { user: admin, password: pwd } ] }, + AGG: { + exports: [ { service: "PING.>" } ] + users: [ { user: agg, password: agg } ] + } + } + leaf { listen: 127.0.0.1:-1 } + `)) + + sb, ob := RunServerWithConfig(bConf) + defer sb.Shutdown() + + tmpl := ` + listen: 127.0.0.1:-1 + server_name: %s + jetstream: { store_dir: '%s' } + cluster { + name: %s + listen: 127.0.0.1:%d + routes = [%s] + } + accounts { + $SYS: { users: [ { user: admin, password: pwd } ] }, + A: { + mappings: { "A.>" : ">" } + exports: [ { service: A.> } ] + users: [ { user: a, password: a } ] + }, + AGG: { + imports: [ { service: { subject: A.>, account: A } } ] + users: [ { user: agg, password: agg } ] + }, + } + leaf { + remotes: [ { + urls: [ nats-leaf://agg:agg@127.0.0.1:{LEAF_PORT} ] + account: AGG + } ] + } + ` + tmpl = strings.Replace(tmpl, "{LEAF_PORT}", fmt.Sprintf("%d", ob.LeafNode.Port), 1) + c := createJetStreamCluster(t, tmpl, "cluster-a", "cluster-a-", 3, 22110, false) + defer c.shutdown() + + // Make sure all servers are connected via leafnode to the hub, the b server. + for _, s := range c.servers { + checkLeafNodeConnectedCount(t, s, 1) + } + + // Connect to a server in the cluster and create a DQ listener. + nc, _ := jsClientConnect(t, c.randomServer(), nats.UserInfo("a", "a")) + defer nc.Close() + + var got atomic.Int32 + + natsQueueSub(t, nc, "PING", "Q", func(m *nats.Msg) { + got.Add(1) + m.Respond([]byte("REPLY")) + }) + + // Now connect to B and send the request. + ncb, _ := jsClientConnect(t, sb, nats.UserInfo("agg", "agg")) + defer ncb.Close() + + _, err := ncb.Request("A.PING", []byte("REQUEST"), time.Second) + require_NoError(t, err) + require_Equal(t, got.Load(), 1) +}