Skip to content

Commit

Permalink
Improved time to select skip list and starting sequence number for de…
Browse files Browse the repository at this point in the history
…liver last by subject.

Previously we would need to move through all msg blocks and load all of the blocks into memory.
We optimized this regardless to flush the cache if we loaded it like other places in the code, but for this specifically we load subject's totals which is already in memory and for each matching subject we now simply load the last message for that subject and add the sequence number to the skip list.

This drastically improves consumers that want last per subject, like KV watchers and object store lists.

Signed-off-by: Derek Collison <[email protected]>
  • Loading branch information
derekcollison committed Oct 26, 2023
1 parent 2fb1b1b commit a134ab8
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 36 deletions.
54 changes: 21 additions & 33 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4657,16 +4657,6 @@ type lastSeqSkipList struct {
seqs []uint64
}

// Will create a skip list for us from a store's subjects state.
func createLastSeqSkipList(mss map[string]SimpleState) []uint64 {
seqs := make([]uint64, 0, len(mss))
for _, ss := range mss {
seqs = append(seqs, ss.Last)
}
sort.Slice(seqs, func(i, j int) bool { return seqs[i] < seqs[j] })
return seqs
}

// Let's us know we have a skip list, which is for deliver last per subject and we are just starting.
// Lock should be held.
func (o *consumer) hasSkipListPending() bool {
Expand Down Expand Up @@ -4697,38 +4687,36 @@ func (o *consumer) selectStartingSeqNo() {
}
}
} else if o.cfg.DeliverPolicy == DeliverLastPerSubject {
lss := &lastSeqSkipList{resume: state.LastSeq}
var smv StoreMsg
var filters []string
if o.subjf == nil {
if mss := o.mset.store.SubjectsState(o.cfg.FilterSubject); len(mss) > 0 {
o.lss = &lastSeqSkipList{
resume: state.LastSeq,
seqs: createLastSeqSkipList(mss),
}
o.sseq = o.lss.seqs[0]
} else {
// If no mapping info just set to last.
o.sseq = state.LastSeq
filters = append(filters, o.cfg.FilterSubject)
} else {
for _, filter := range o.subjf {
filters = append(filters, filter.subject)
}
return
}
lss := &lastSeqSkipList{
resume: state.LastSeq,
}
for _, filter := range o.subjf {
if mss := o.mset.store.SubjectsState(filter.subject); len(mss) > 0 {
lss.seqs = append(lss.seqs, createLastSeqSkipList(mss)...)
for _, filter := range filters {
for subj := range o.mset.store.SubjectsTotals(filter) {
if sm, err := o.mset.store.LoadLastMsg(subj, &smv); err == nil {
lss.seqs = append(lss.seqs, sm.seq)
}
}
}
// Sort the skip list if needed.
if len(lss.seqs) > 1 {
sort.Slice(lss.seqs, func(i, j int) bool {
return lss.seqs[j] > lss.seqs[i]
})
}
if len(lss.seqs) == 0 {
o.sseq = state.LastSeq
} else {
o.sseq = lss.seqs[0]
}
// Sort the skip list
sort.Slice(lss.seqs, func(i, j int) bool {
return lss.seqs[j] > lss.seqs[i]
})
// Assign skip list.
o.lss = lss
if len(o.lss.seqs) != 0 {
o.sseq = o.lss.seqs[0]
}
} else if o.cfg.OptStartTime != nil {
// If we are here we are time based.
// TODO(dlc) - Once clustered can't rely on this.
Expand Down
14 changes: 11 additions & 3 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -2453,8 +2453,12 @@ func (fs *fileStore) SubjectsState(subject string) map[string]SimpleState {
}

mb.mu.Lock()
// Make sure we have fss loaded.
mb.ensurePerSubjectInfoLoaded()
var shouldExpire bool
if mb.cacheNotLoaded() {
// Make sure we have fss loaded.
mb.loadMsgsWithLock()
shouldExpire = true
}
for subj, ss := range mb.fss {
if subject == _EMPTY_ || subject == fwcs || subjectIsSubsetMatch(subj, subject) {
if ss.firstNeedsUpdate {
Expand All @@ -2470,6 +2474,10 @@ func (fs *fileStore) SubjectsState(subject string) map[string]SimpleState {
}
}
}
if shouldExpire {
// Expire this cache before moving on.
mb.tryForceExpireCacheLocked()
}
mb.mu.Unlock()

if mb == stop {
Expand Down Expand Up @@ -6748,7 +6756,7 @@ func (mb *msgBlock) generatePerSubjectInfo() error {
if err := mb.loadMsgsWithLock(); err != nil {
return err
}
// indexCaceheBuf can produce fss now, so if non-nil we are good.
// indexCacheBuf can produce fss now, so if non-nil we are good.
if mb.fss != nil {
return nil
}
Expand Down

0 comments on commit a134ab8

Please sign in to comment.