Skip to content

Commit

Permalink
Make sure if we have lots of subjects to fallback to subjects state v…
Browse files Browse the repository at this point in the history
…s get last.

Signed-off-by: Derek Collison <[email protected]>
  • Loading branch information
derekcollison committed Oct 27, 2023
1 parent 51b6a8e commit e0ae88c
Showing 1 changed file with 12 additions and 4 deletions.
16 changes: 12 additions & 4 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4687,8 +4687,9 @@ func (o *consumer) selectStartingSeqNo() {
}
}
} else if o.cfg.DeliverPolicy == DeliverLastPerSubject {
// A threshold for when we switch from get last msg to subjects state.
const numSubjectsThresh = 256
lss := &lastSeqSkipList{resume: state.LastSeq}
var smv StoreMsg
var filters []string
if o.subjf == nil {
filters = append(filters, o.cfg.FilterSubject)
Expand All @@ -4698,9 +4699,16 @@ func (o *consumer) selectStartingSeqNo() {
}
}
for _, filter := range filters {
for subj := range o.mset.store.SubjectsTotals(filter) {
if sm, err := o.mset.store.LoadLastMsg(subj, &smv); err == nil {
lss.seqs = append(lss.seqs, sm.seq)
if st := o.mset.store.SubjectsTotals(filter); len(st) < numSubjectsThresh {
var smv StoreMsg
for subj := range st {
if sm, err := o.mset.store.LoadLastMsg(subj, &smv); err == nil {
lss.seqs = append(lss.seqs, sm.seq)
}
}
} else if mss := o.mset.store.SubjectsState(filter); len(mss) > 0 {
for _, ss := range mss {
lss.seqs = append(lss.seqs, ss.Last)
}
}
}
Expand Down

0 comments on commit e0ae88c

Please sign in to comment.