Skip to content

Commit

Permalink
If we have deliver last by subject and max msgs per subject of 1, we …
Browse files Browse the repository at this point in the history
…can short circuit to normal consumer.

Signed-off-by: Derek Collison <[email protected]>
  • Loading branch information
derekcollison committed Oct 27, 2023
1 parent 9edecc8 commit c999177
Showing 1 changed file with 38 additions and 32 deletions.
70 changes: 38 additions & 32 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4687,44 +4687,50 @@ func (o *consumer) selectStartingSeqNo() {
}
}
} else if o.cfg.DeliverPolicy == DeliverLastPerSubject {
// A threshold for when we switch from get last msg to subjects state.
const numSubjectsThresh = 256
lss := &lastSeqSkipList{resume: state.LastSeq}
var filters []string
if o.subjf == nil {
filters = append(filters, o.cfg.FilterSubject)
// If our parent stream is set to max msgs per subject of 1 this is just
// a normal consumer at this point. We can avoid any heavy lifting.
if o.mset.cfg.MaxMsgsPer == 1 {
o.sseq = state.FirstSeq
} else {
for _, filter := range o.subjf {
filters = append(filters, filter.subject)
// A threshold for when we switch from get last msg to subjects state.
const numSubjectsThresh = 256
lss := &lastSeqSkipList{resume: state.LastSeq}
var filters []string
if o.subjf == nil {
filters = append(filters, o.cfg.FilterSubject)
} else {
for _, filter := range o.subjf {
filters = append(filters, filter.subject)
}
}
}
for _, filter := range filters {
if st := o.mset.store.SubjectsTotals(filter); len(st) < numSubjectsThresh {
var smv StoreMsg
for subj := range st {
if sm, err := o.mset.store.LoadLastMsg(subj, &smv); err == nil {
lss.seqs = append(lss.seqs, sm.seq)
for _, filter := range filters {
if st := o.mset.store.SubjectsTotals(filter); len(st) < numSubjectsThresh {
var smv StoreMsg
for subj := range st {
if sm, err := o.mset.store.LoadLastMsg(subj, &smv); err == nil {
lss.seqs = append(lss.seqs, sm.seq)
}
}
} else if mss := o.mset.store.SubjectsState(filter); len(mss) > 0 {
for _, ss := range mss {
lss.seqs = append(lss.seqs, ss.Last)
}
}
} else if mss := o.mset.store.SubjectsState(filter); len(mss) > 0 {
for _, ss := range mss {
lss.seqs = append(lss.seqs, ss.Last)
}
}
// Sort the skip list if needed.
if len(lss.seqs) > 1 {
sort.Slice(lss.seqs, func(i, j int) bool {
return lss.seqs[j] > lss.seqs[i]
})
}
if len(lss.seqs) == 0 {
o.sseq = state.LastSeq
} else {
o.sseq = lss.seqs[0]
}
// Assign skip list.
o.lss = lss
}
// Sort the skip list if needed.
if len(lss.seqs) > 1 {
sort.Slice(lss.seqs, func(i, j int) bool {
return lss.seqs[j] > lss.seqs[i]
})
}
if len(lss.seqs) == 0 {
o.sseq = state.LastSeq
} else {
o.sseq = lss.seqs[0]
}
// Assign skip list.
o.lss = lss
} else if o.cfg.OptStartTime != nil {
// If we are here we are time based.
// TODO(dlc) - Once clustered can't rely on this.
Expand Down

0 comments on commit c999177

Please sign in to comment.