Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimise createTable in stream_writer.go #1132

Merged
merged 3 commits into from
Nov 28, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions level_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,33 @@ func (s *levelHandler) replaceTables(toDel, toAdd []*table.Table) error {
return decrRefs(toDel)
}

// addTable adds toAdd tables to levelHandler. Normally when we add tables to levelHandler, we sort
// tables based on table.Smallest. This is required for correctness of the system. But in some cases
// this can be avoided(such as stream writer). We can just add tables to levelHandler's table list
// and after all addTables calls, we can sort table list(check sortTable method).
// NOTE: addTables and sortTables duplicate some code from replaceTables().
func (s *levelHandler) addTables(toAdd []*table.Table) {
s.Lock()
defer s.Unlock()

// Increase totalSize first.
for _, t := range toAdd {
s.totalSize += t.Size()
t.IncrRef()
s.tables = append(s.tables, t)
}
}

// sortTables sorts tables of levelHandler based on table.Smallest.
func (s *levelHandler) sortTables() {
s.RLock()
defer s.RUnlock()

sort.Slice(s.tables, func(i, j int) bool {
return y.CompareKeys(s.tables[i].Smallest(), s.tables[j].Smallest()) < 0
})
}

func decrRefs(tables []*table.Table) error {
for _, table := range tables {
if err := table.DecrRef(); err != nil {
Expand Down
13 changes: 10 additions & 3 deletions stream_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,11 @@ func (sw *StreamWriter) Flush() error {
return err
}

// Sort tables at the end.
for _, l := range sw.db.lc.levels {
l.sortTables()
}

// Now sync the directories, so all the files are registered.
if sw.db.opt.ValueDir != sw.db.opt.Dir {
if err := syncDir(sw.db.opt.ValueDir); err != nil {
Expand Down Expand Up @@ -449,9 +454,11 @@ func (w *sortedWriter) createTable(builder *table.Builder) error {
if err := w.db.manifest.addChanges([]*pb.ManifestChange{change}); err != nil {
return err
}
if err := lhandler.replaceTables([]*table.Table{}, []*table.Table{tbl}); err != nil {
return err
}

// We are not calling lhandler.replaceTables() here, as it sorts tables on every addition.
// We can sort all tables only once during Flush() call.
lhandler.addTables([]*table.Table{tbl})

// Release the ref held by OpenTable.
_ = tbl.DecrRef()
w.db.opt.Infof("Table created: %d at level: %d for stream: %d. Size: %s\n",
Expand Down