Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IMPROVED] Improved time to select skip list and starting sequence number for deliver last by subject #4712

Merged
merged 1 commit into from
Oct 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 21 additions & 33 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4657,16 +4657,6 @@ type lastSeqSkipList struct {
seqs []uint64
}

// Will create a skip list for us from a store's subjects state.
func createLastSeqSkipList(mss map[string]SimpleState) []uint64 {
seqs := make([]uint64, 0, len(mss))
for _, ss := range mss {
seqs = append(seqs, ss.Last)
}
sort.Slice(seqs, func(i, j int) bool { return seqs[i] < seqs[j] })
return seqs
}

// Let's us know we have a skip list, which is for deliver last per subject and we are just starting.
// Lock should be held.
func (o *consumer) hasSkipListPending() bool {
Expand Down Expand Up @@ -4697,38 +4687,36 @@ func (o *consumer) selectStartingSeqNo() {
}
}
} else if o.cfg.DeliverPolicy == DeliverLastPerSubject {
lss := &lastSeqSkipList{resume: state.LastSeq}
var smv StoreMsg
var filters []string
if o.subjf == nil {
if mss := o.mset.store.SubjectsState(o.cfg.FilterSubject); len(mss) > 0 {
o.lss = &lastSeqSkipList{
resume: state.LastSeq,
seqs: createLastSeqSkipList(mss),
}
o.sseq = o.lss.seqs[0]
} else {
// If no mapping info just set to last.
o.sseq = state.LastSeq
filters = append(filters, o.cfg.FilterSubject)
} else {
for _, filter := range o.subjf {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could potentially preallocate filters to the length of o.subjf here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be a small number or 1 in majority of cases.

filters = append(filters, filter.subject)
}
return
}
lss := &lastSeqSkipList{
resume: state.LastSeq,
}
for _, filter := range o.subjf {
if mss := o.mset.store.SubjectsState(filter.subject); len(mss) > 0 {
lss.seqs = append(lss.seqs, createLastSeqSkipList(mss)...)
for _, filter := range filters {
for subj := range o.mset.store.SubjectsTotals(filter) {
if sm, err := o.mset.store.LoadLastMsg(subj, &smv); err == nil {
lss.seqs = append(lss.seqs, sm.seq)
}
}
}
// Sort the skip list if needed.
if len(lss.seqs) > 1 {
sort.Slice(lss.seqs, func(i, j int) bool {
return lss.seqs[j] > lss.seqs[i]
})
}
if len(lss.seqs) == 0 {
o.sseq = state.LastSeq
} else {
o.sseq = lss.seqs[0]
}
// Sort the skip list
sort.Slice(lss.seqs, func(i, j int) bool {
return lss.seqs[j] > lss.seqs[i]
})
// Assign skip list.
o.lss = lss
if len(o.lss.seqs) != 0 {
o.sseq = o.lss.seqs[0]
}
} else if o.cfg.OptStartTime != nil {
// If we are here we are time based.
// TODO(dlc) - Once clustered can't rely on this.
Expand Down
14 changes: 11 additions & 3 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -2453,8 +2453,12 @@ func (fs *fileStore) SubjectsState(subject string) map[string]SimpleState {
}

mb.mu.Lock()
// Make sure we have fss loaded.
mb.ensurePerSubjectInfoLoaded()
var shouldExpire bool
if mb.cacheNotLoaded() {
// Make sure we have fss loaded.
mb.loadMsgsWithLock()
shouldExpire = true
}
for subj, ss := range mb.fss {
if subject == _EMPTY_ || subject == fwcs || subjectIsSubsetMatch(subj, subject) {
if ss.firstNeedsUpdate {
Expand All @@ -2470,6 +2474,10 @@ func (fs *fileStore) SubjectsState(subject string) map[string]SimpleState {
}
}
}
if shouldExpire {
// Expire this cache before moving on.
mb.tryForceExpireCacheLocked()
}
mb.mu.Unlock()

if mb == stop {
Expand Down Expand Up @@ -6748,7 +6756,7 @@ func (mb *msgBlock) generatePerSubjectInfo() error {
if err := mb.loadMsgsWithLock(); err != nil {
return err
}
// indexCaceheBuf can produce fss now, so if non-nil we are good.
// indexCacheBuf can produce fss now, so if non-nil we are good.
if mb.fss != nil {
return nil
}
Expand Down
Loading