Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimise createTable in stream_writer.go #1132

Merged
merged 3 commits into from
Nov 28, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions level_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,31 @@ func (s *levelHandler) replaceTables(toDel, toAdd []*table.Table) error {
return decrRefs(toDel)
}

// addTable adds toAdd table to levelHandler. Normally when we add tables to levelHandler, we sort
// tables based on table.Smallest. This is required for correctness of the system. But in case of
// stream writer this can be avoided. We can just add tables to levelHandler's table list
// and after all addTable calls, we can sort table list(check sortTable method).
// NOTE: levelHandler.sortTables() should be called after call addTable calls are done.
func (s *levelHandler) addTable(t *table.Table) {
s.Lock()
defer s.Unlock()

s.totalSize += t.Size() // Increase totalSize first.
t.IncrRef()
s.tables = append(s.tables, t)
}

// sortTables sorts tables of levelHandler based on table.Smallest.
// Normally it should be called after all addTable calls.
func (s *levelHandler) sortTables() {
s.RLock()
defer s.RUnlock()

sort.Slice(s.tables, func(i, j int) bool {
return y.CompareKeys(s.tables[i].Smallest(), s.tables[j].Smallest()) < 0
})
}

func decrRefs(tables []*table.Table) error {
for _, table := range tables {
if err := table.DecrRef(); err != nil {
Expand Down
13 changes: 10 additions & 3 deletions stream_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,11 @@ func (sw *StreamWriter) Flush() error {
return err
}

// Sort tables at the end.
for _, l := range sw.db.lc.levels {
l.sortTables()
}

// Now sync the directories, so all the files are registered.
if sw.db.opt.ValueDir != sw.db.opt.Dir {
if err := syncDir(sw.db.opt.ValueDir); err != nil {
Expand Down Expand Up @@ -449,9 +454,11 @@ func (w *sortedWriter) createTable(builder *table.Builder) error {
if err := w.db.manifest.addChanges([]*pb.ManifestChange{change}); err != nil {
return err
}
if err := lhandler.replaceTables([]*table.Table{}, []*table.Table{tbl}); err != nil {
return err
}

// We are not calling lhandler.replaceTables() here, as it sorts tables on every addition.
// We can sort all tables only once during Flush() call.
lhandler.addTable(tbl)

// Release the ref held by OpenTable.
_ = tbl.DecrRef()
w.db.opt.Infof("Table created: %d at level: %d for stream: %d. Size: %s\n",
Expand Down