Skip to content

Commit

Permalink
Add in NumSubjects to StreamInfo
Browse files Browse the repository at this point in the history
Signed-off-by: Derek Collison <[email protected]>
  • Loading branch information
derekcollison committed Feb 2, 2022
1 parent 6a3cf0f commit 5da0453
Show file tree
Hide file tree
Showing 4 changed files with 211 additions and 20 deletions.
39 changes: 37 additions & 2 deletions server/filestore.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2019-2021 The NATS Authors
// Copyright 2019-2022 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -89,6 +89,7 @@ type fileStore struct {
aek cipher.AEAD
lmb *msgBlock
blks []*msgBlock
psmc map[string]uint64
hh hash.Hash64
qch chan struct{}
cfs []*consumerFileStore
Expand Down Expand Up @@ -280,6 +281,7 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim
fs := &fileStore{
fcfg: fcfg,
cfg: FileStreamInfo{Created: created, StreamConfig: cfg},
psmc: make(map[string]uint64),
prf: prf,
qch: make(chan struct{}),
}
Expand Down Expand Up @@ -879,6 +881,13 @@ func (fs *fileStore) recoverMsgs() error {
}
fs.state.Msgs += mb.msgs
fs.state.Bytes += mb.bytes
// Walk the fss for this mb and fill in fs.psmc
for subj, ss := range mb.fss {
if len(subj) > 0 {
fs.psmc[subj] += ss.Msgs
}
}

} else {
return err
}
Expand Down Expand Up @@ -988,6 +997,7 @@ func (fs *fileStore) expireMsgsOnRecover() {
mb.msgs--
purged++
// Update fss
fs.removePerSubject(sm.subj)
mb.removeSeqPerSubject(sm.subj, seq)
}

Expand Down Expand Up @@ -1528,6 +1538,11 @@ func (fs *fileStore) storeRawMsg(subj string, hdr, msg []byte, seq uint64, ts in
return err
}

// Adjust top level tracking of per subject msg counts.
if len(subj) > 0 {
fs.psmc[subj]++
}

// Adjust first if needed.
now := time.Unix(0, ts).UTC()
if fs.state.Msgs == 0 {
Expand Down Expand Up @@ -1724,6 +1739,19 @@ func (fs *fileStore) EraseMsg(seq uint64) (bool, error) {
return fs.removeMsg(seq, true, true)
}

// Convenience function to remove per subject tracking at the filestore level.
// Lock should be held.
func (fs *fileStore) removePerSubject(subj string) {
if len(subj) == 0 {
return
}
if n, ok := fs.psmc[subj]; ok && n == 1 {
delete(fs.psmc, subj)
} else if ok {
fs.psmc[subj]--
}
}

// Remove a message, optionally rewriting the mb file.
func (fs *fileStore) removeMsg(seq uint64, secure, needFSLock bool) (bool, error) {
fsLock := func() {
Expand Down Expand Up @@ -1811,6 +1839,7 @@ func (fs *fileStore) removeMsg(seq uint64, secure, needFSLock bool) (bool, error
mb.bytes -= msz

// If we are tracking multiple subjects here make sure we update that accounting.
fs.removePerSubject(sm.subj)
mb.removeSeqPerSubject(sm.subj, seq)

var shouldWriteIndex, firstSeqNeedsUpdate bool
Expand Down Expand Up @@ -3455,7 +3484,7 @@ func (fs *fileStore) FastState(state *StreamState) {
}
}
state.Consumers = len(fs.cfs)

state.NumSubjects = len(fs.psmc)
fs.mu.RUnlock()
}

Expand All @@ -3464,6 +3493,7 @@ func (fs *fileStore) State() StreamState {
fs.mu.RLock()
state := fs.state
state.Consumers = len(fs.cfs)
state.NumSubjects = len(fs.psmc)
state.Deleted = nil // make sure.

for _, mb := range fs.blks {
Expand Down Expand Up @@ -3825,6 +3855,7 @@ func (fs *fileStore) PurgeEx(subject string, sequence, keep uint64) (purged uint
mb.msgs--
mb.bytes -= rl
// FSS updates.
fs.removePerSubject(sm.subj)
mb.removeSeqPerSubject(sm.subj, seq)
// Check for first message.
if seq == mb.first.seq {
Expand Down Expand Up @@ -3930,6 +3961,9 @@ func (fs *fileStore) purge(fseq uint64) (uint64, error) {

fs.lmb.writeIndexInfo()

// Clear any per subject tracking.
fs.psmc = make(map[string]uint64)

cb := fs.scb
fs.mu.Unlock()

Expand Down Expand Up @@ -3994,6 +4028,7 @@ func (fs *fileStore) Compact(seq uint64) (uint64, error) {
smb.msgs--
purged++
// Update fss
fs.removePerSubject(sm.subj)
smb.removeSeqPerSubject(sm.subj, mseq)
}
}
Expand Down
165 changes: 159 additions & 6 deletions server/jetstream_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2019-2021 The NATS Authors
// Copyright 2019-2022 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -8420,7 +8420,7 @@ func TestJetStreamDeleteMsg(t *testing.T) {
t.Fatalf("Expected to get the stream back")
}

expected := StreamState{Msgs: 6, Bytes: 6 * bytesPerMsg, FirstSeq: 12, LastSeq: 20}
expected := StreamState{Msgs: 6, Bytes: 6 * bytesPerMsg, FirstSeq: 12, LastSeq: 20, NumSubjects: 1}
state = mset.state()
state.FirstTime, state.LastTime, state.Deleted, state.NumDeleted = time.Time{}, time.Time{}, nil, 0

Expand Down Expand Up @@ -14656,7 +14656,7 @@ func TestJetStreamStreamInfoSubjectsDetails(t *testing.T) {
nc, js := jsClientConnect(t, s)
defer nc.Close()

getInfo := func(filter string) *StreamInfo {
getInfo := func(t *testing.T, filter string) *StreamInfo {
t.Helper()
// Need to grab StreamInfo by hand for now.
req, err := json.Marshal(&JSApiStreamInfoRequest{SubjectsFilter: filter})
Expand All @@ -14666,6 +14666,9 @@ func TestJetStreamStreamInfoSubjectsDetails(t *testing.T) {
var si StreamInfo
err = json.Unmarshal(resp.Data, &si)
require_NoError(t, err)
if si.State.NumSubjects != 3 {
t.Fatalf("Expected NumSubjects to be 3, but got %d", si.State.NumSubjects)
}
return &si
}

Expand All @@ -14689,23 +14692,173 @@ func TestJetStreamStreamInfoSubjectsDetails(t *testing.T) {

// Test all subjects first.
expected := map[string]uint64{"foo": 22, "bar": 33, "baz": 44}
if si := getInfo(nats.AllKeys); !reflect.DeepEqual(si.State.Subjects, expected) {
if si := getInfo(t, nats.AllKeys); !reflect.DeepEqual(si.State.Subjects, expected) {
t.Fatalf("Expected subjects of %+v, but got %+v", expected, si.State.Subjects)
}
if si := getInfo("*"); !reflect.DeepEqual(si.State.Subjects, expected) {
if si := getInfo(t, "*"); !reflect.DeepEqual(si.State.Subjects, expected) {
t.Fatalf("Expected subjects of %+v, but got %+v", expected, si.State.Subjects)
}
// Filtered to 1.
expected = map[string]uint64{"foo": 22}
if si := getInfo("foo"); !reflect.DeepEqual(si.State.Subjects, expected) {
if si := getInfo(t, "foo"); !reflect.DeepEqual(si.State.Subjects, expected) {
t.Fatalf("Expected subjects of %+v, but got %+v", expected, si.State.Subjects)
}
}

t.Run("MemoryStore", func(t *testing.T) { testSubjects(t, nats.MemoryStorage) })
t.Run("FileStore", func(t *testing.T) { testSubjects(t, nats.FileStorage) })
}

func TestJetStreamStreamInfoSubjectsDetailsWithDeleteAndPurge(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()

if config := s.JetStreamConfig(); config != nil {
defer removeDir(t, config.StoreDir)
}

nc, js := jsClientConnect(t, s)
defer nc.Close()

getInfo := func(t *testing.T, filter string) *StreamInfo {
t.Helper()
// Need to grab StreamInfo by hand for now.
req, err := json.Marshal(&JSApiStreamInfoRequest{SubjectsFilter: filter})
require_NoError(t, err)
resp, err := nc.Request(fmt.Sprintf(JSApiStreamInfoT, "TEST"), req, time.Second)
require_NoError(t, err)
var si StreamInfo
err = json.Unmarshal(resp.Data, &si)
require_NoError(t, err)
return &si
}

checkResults := func(t *testing.T, expected map[string]uint64) {
t.Helper()
si := getInfo(t, nats.AllKeys)
if !reflect.DeepEqual(si.State.Subjects, expected) {
t.Fatalf("Expected subjects of %+v, but got %+v", expected, si.State.Subjects)
}
if si.State.NumSubjects != len(expected) {
t.Fatalf("Expected NumSubjects to be %d, but got %d", len(expected), si.State.NumSubjects)
}
}

testSubjects := func(t *testing.T, st nats.StorageType) {
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"*"},
Storage: st,
})
require_NoError(t, err)
defer js.DeleteStream("TEST")

msg := []byte("ok")
js.Publish("foo", msg) // 1
js.Publish("foo", msg) // 2
js.Publish("bar", msg) // 3
js.Publish("baz", msg) // 4
js.Publish("baz", msg) // 5
js.Publish("bar", msg) // 6
js.Publish("bar", msg) // 7

checkResults(t, map[string]uint64{"foo": 2, "bar": 3, "baz": 2})

// Now delete some messages.
js.DeleteMsg("TEST", 6)

checkResults(t, map[string]uint64{"foo": 2, "bar": 2, "baz": 2})

// Delete and add right back, so no-op
js.DeleteMsg("TEST", 5) // baz
js.Publish("baz", msg) // 8

checkResults(t, map[string]uint64{"foo": 2, "bar": 2, "baz": 2})

// Now do a purge only of bar.
jr, _ := json.Marshal(&JSApiStreamPurgeRequest{Subject: "bar"})
_, err = nc.Request(fmt.Sprintf(JSApiStreamPurgeT, "TEST"), jr, time.Second)
require_NoError(t, err)

checkResults(t, map[string]uint64{"foo": 2, "baz": 2})

// Now purge everything
err = js.PurgeStream("TEST")
require_NoError(t, err)

si := getInfo(t, nats.AllKeys)
if len(si.State.Subjects) != 0 {
t.Fatalf("Expected no subjects, but got %+v", si.State.Subjects)
}
if si.State.NumSubjects != 0 {
t.Fatalf("Expected NumSubjects to be 0, but got %d", si.State.NumSubjects)
}
}

t.Run("MemoryStore", func(t *testing.T) { testSubjects(t, nats.MemoryStorage) })
t.Run("FileStore", func(t *testing.T) { testSubjects(t, nats.FileStorage) })
}

func TestJetStreamStreamInfoSubjectsDetailsAfterRestart(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()

if config := s.JetStreamConfig(); config != nil {
defer removeDir(t, config.StoreDir)
}

nc, js := jsClientConnect(t, s)
defer nc.Close()

getInfo := func(t *testing.T, filter string) *StreamInfo {
t.Helper()
// Need to grab StreamInfo by hand for now.
req, err := json.Marshal(&JSApiStreamInfoRequest{SubjectsFilter: filter})
require_NoError(t, err)
resp, err := nc.Request(fmt.Sprintf(JSApiStreamInfoT, "TEST"), req, time.Second)
require_NoError(t, err)
var si StreamInfo
err = json.Unmarshal(resp.Data, &si)
require_NoError(t, err)
return &si
}

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"*"},
})
require_NoError(t, err)
defer js.DeleteStream("TEST")

msg := []byte("ok")
js.Publish("foo", msg) // 1
js.Publish("foo", msg) // 2
js.Publish("bar", msg) // 3
js.Publish("baz", msg) // 4
js.Publish("baz", msg) // 5

si := getInfo(t, nats.AllKeys)
if si.State.NumSubjects != 3 {
t.Fatalf("Expected 3 subjects, but got %d", si.State.NumSubjects)
}

// Stop current
nc.Close()
sd := s.JetStreamConfig().StoreDir
s.Shutdown()
// Restart.
s = RunJetStreamServerOnPort(-1, sd)
defer s.Shutdown()

nc, _ = jsClientConnect(t, s)
defer nc.Close()

si = getInfo(t, nats.AllKeys)
if si.State.NumSubjects != 3 {
t.Fatalf("Expected 3 subjects, but got %d", si.State.NumSubjects)
}
}

// Issue #2836
func TestJetStreamInterestRetentionBug(t *testing.T) {
s := RunBasicJetStreamServer()
Expand Down
2 changes: 2 additions & 0 deletions server/memstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -754,6 +754,7 @@ func (ms *memStore) FastState(state *StreamState) {
state.NumDeleted = int((state.LastSeq - state.FirstSeq) - state.Msgs + 1)
}
state.Consumers = ms.consumers
state.NumSubjects = len(ms.fss)
ms.mu.RUnlock()
}

Expand All @@ -763,6 +764,7 @@ func (ms *memStore) State() StreamState {

state := ms.state
state.Consumers = ms.consumers
state.NumSubjects = len(ms.fss)
state.Deleted = nil

// Calculate interior delete details.
Expand Down
25 changes: 13 additions & 12 deletions server/store.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2019-2021 The NATS Authors
// Copyright 2019-2022 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -119,17 +119,18 @@ const (

// StreamState is information about the given stream.
type StreamState struct {
Msgs uint64 `json:"messages"`
Bytes uint64 `json:"bytes"`
FirstSeq uint64 `json:"first_seq"`
FirstTime time.Time `json:"first_ts"`
LastSeq uint64 `json:"last_seq"`
LastTime time.Time `json:"last_ts"`
Subjects map[string]uint64 `json:"subjects,omitempty"`
NumDeleted int `json:"num_deleted,omitempty"`
Deleted []uint64 `json:"deleted,omitempty"`
Lost *LostStreamData `json:"lost,omitempty"`
Consumers int `json:"consumer_count"`
Msgs uint64 `json:"messages"`
Bytes uint64 `json:"bytes"`
FirstSeq uint64 `json:"first_seq"`
FirstTime time.Time `json:"first_ts"`
LastSeq uint64 `json:"last_seq"`
LastTime time.Time `json:"last_ts"`
NumSubjects int `json:"num_subjects,omitempty"`
Subjects map[string]uint64 `json:"subjects,omitempty"`
NumDeleted int `json:"num_deleted,omitempty"`
Deleted []uint64 `json:"deleted,omitempty"`
Lost *LostStreamData `json:"lost,omitempty"`
Consumers int `json:"consumer_count"`
}

// SimpleState for filtered subject specific state.
Expand Down

0 comments on commit 5da0453

Please sign in to comment.