Skip to content

Commit

Permalink
[IMPROVED] Improved time to select skip list and starting sequence nu…
Browse files Browse the repository at this point in the history
…mber for deliver last by subject (#4712)

Previously we would need to move through all msg blocks and load all of
the blocks into memory. We optimized this regardless to flush the cache
if we loaded it like other places in the code, but for this specifically
we load subject's totals which is already in memory and for each
matching subject we now simply load the last message for that subject
and add the sequence number to the skip list.

This drastically improves consumers that want last per subject, like KV
watchers and object store lists.

Signed-off-by: Derek Collison <[email protected]>

Resolves: #4680
  • Loading branch information
wallyqs authored Oct 26, 2023
2 parents 2fb1b1b + a134ab8 commit 51b6a8e
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 36 deletions.
54 changes: 21 additions & 33 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4657,16 +4657,6 @@ type lastSeqSkipList struct {
seqs []uint64
}

// Will create a skip list for us from a store's subjects state.
func createLastSeqSkipList(mss map[string]SimpleState) []uint64 {
seqs := make([]uint64, 0, len(mss))
for _, ss := range mss {
seqs = append(seqs, ss.Last)
}
sort.Slice(seqs, func(i, j int) bool { return seqs[i] < seqs[j] })
return seqs
}

// Let's us know we have a skip list, which is for deliver last per subject and we are just starting.
// Lock should be held.
func (o *consumer) hasSkipListPending() bool {
Expand Down Expand Up @@ -4697,38 +4687,36 @@ func (o *consumer) selectStartingSeqNo() {
}
}
} else if o.cfg.DeliverPolicy == DeliverLastPerSubject {
lss := &lastSeqSkipList{resume: state.LastSeq}
var smv StoreMsg
var filters []string
if o.subjf == nil {
if mss := o.mset.store.SubjectsState(o.cfg.FilterSubject); len(mss) > 0 {
o.lss = &lastSeqSkipList{
resume: state.LastSeq,
seqs: createLastSeqSkipList(mss),
}
o.sseq = o.lss.seqs[0]
} else {
// If no mapping info just set to last.
o.sseq = state.LastSeq
filters = append(filters, o.cfg.FilterSubject)
} else {
for _, filter := range o.subjf {
filters = append(filters, filter.subject)
}
return
}
lss := &lastSeqSkipList{
resume: state.LastSeq,
}
for _, filter := range o.subjf {
if mss := o.mset.store.SubjectsState(filter.subject); len(mss) > 0 {
lss.seqs = append(lss.seqs, createLastSeqSkipList(mss)...)
for _, filter := range filters {
for subj := range o.mset.store.SubjectsTotals(filter) {
if sm, err := o.mset.store.LoadLastMsg(subj, &smv); err == nil {
lss.seqs = append(lss.seqs, sm.seq)
}
}
}
// Sort the skip list if needed.
if len(lss.seqs) > 1 {
sort.Slice(lss.seqs, func(i, j int) bool {
return lss.seqs[j] > lss.seqs[i]
})
}
if len(lss.seqs) == 0 {
o.sseq = state.LastSeq
} else {
o.sseq = lss.seqs[0]
}
// Sort the skip list
sort.Slice(lss.seqs, func(i, j int) bool {
return lss.seqs[j] > lss.seqs[i]
})
// Assign skip list.
o.lss = lss
if len(o.lss.seqs) != 0 {
o.sseq = o.lss.seqs[0]
}
} else if o.cfg.OptStartTime != nil {
// If we are here we are time based.
// TODO(dlc) - Once clustered can't rely on this.
Expand Down
14 changes: 11 additions & 3 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -2453,8 +2453,12 @@ func (fs *fileStore) SubjectsState(subject string) map[string]SimpleState {
}

mb.mu.Lock()
// Make sure we have fss loaded.
mb.ensurePerSubjectInfoLoaded()
var shouldExpire bool
if mb.cacheNotLoaded() {
// Make sure we have fss loaded.
mb.loadMsgsWithLock()
shouldExpire = true
}
for subj, ss := range mb.fss {
if subject == _EMPTY_ || subject == fwcs || subjectIsSubsetMatch(subj, subject) {
if ss.firstNeedsUpdate {
Expand All @@ -2470,6 +2474,10 @@ func (fs *fileStore) SubjectsState(subject string) map[string]SimpleState {
}
}
}
if shouldExpire {
// Expire this cache before moving on.
mb.tryForceExpireCacheLocked()
}
mb.mu.Unlock()

if mb == stop {
Expand Down Expand Up @@ -6748,7 +6756,7 @@ func (mb *msgBlock) generatePerSubjectInfo() error {
if err := mb.loadMsgsWithLock(); err != nil {
return err
}
// indexCaceheBuf can produce fss now, so if non-nil we are good.
// indexCacheBuf can produce fss now, so if non-nil we are good.
if mb.fss != nil {
return nil
}
Expand Down

0 comments on commit 51b6a8e

Please sign in to comment.