Skip to content

Commit

Permalink
Merge commit '9edecc8' into release/v2.10.4
Browse files Browse the repository at this point in the history
  • Loading branch information
wallyqs committed Oct 27, 2023
2 parents 7fd6c58 + 9edecc8 commit 923ef90
Showing 1 changed file with 12 additions and 4 deletions.
16 changes: 12 additions & 4 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4687,8 +4687,9 @@ func (o *consumer) selectStartingSeqNo() {
}
}
} else if o.cfg.DeliverPolicy == DeliverLastPerSubject {
// A threshold for when we switch from get last msg to subjects state.
const numSubjectsThresh = 256
lss := &lastSeqSkipList{resume: state.LastSeq}
var smv StoreMsg
var filters []string
if o.subjf == nil {
filters = append(filters, o.cfg.FilterSubject)
Expand All @@ -4698,9 +4699,16 @@ func (o *consumer) selectStartingSeqNo() {
}
}
for _, filter := range filters {
for subj := range o.mset.store.SubjectsTotals(filter) {
if sm, err := o.mset.store.LoadLastMsg(subj, &smv); err == nil {
lss.seqs = append(lss.seqs, sm.seq)
if st := o.mset.store.SubjectsTotals(filter); len(st) < numSubjectsThresh {
var smv StoreMsg
for subj := range st {
if sm, err := o.mset.store.LoadLastMsg(subj, &smv); err == nil {
lss.seqs = append(lss.seqs, sm.seq)
}
}
} else if mss := o.mset.store.SubjectsState(filter); len(mss) > 0 {
for _, ss := range mss {
lss.seqs = append(lss.seqs, ss.Last)
}
}
}
Expand Down

0 comments on commit 923ef90

Please sign in to comment.