diff --git a/level_handler.go b/level_handler.go index 2cd106248..dbc2532ba 100644 --- a/level_handler.go +++ b/level_handler.go @@ -140,6 +140,31 @@ func (s *levelHandler) replaceTables(toDel, toAdd []*table.Table) error { return decrRefs(toDel) } +// addTable adds toAdd table to levelHandler. Normally when we add tables to levelHandler, we sort +// tables based on table.Smallest. This is required for correctness of the system. But in case of +// stream writer this can be avoided. We can just add tables to levelHandler's table list +// and after all addTable calls, we can sort table list(check sortTable method). +// NOTE: levelHandler.sortTables() should be called after call addTable calls are done. +func (s *levelHandler) addTable(t *table.Table) { + s.Lock() + defer s.Unlock() + + s.totalSize += t.Size() // Increase totalSize first. + t.IncrRef() + s.tables = append(s.tables, t) +} + +// sortTables sorts tables of levelHandler based on table.Smallest. +// Normally it should be called after all addTable calls. +func (s *levelHandler) sortTables() { + s.RLock() + defer s.RUnlock() + + sort.Slice(s.tables, func(i, j int) bool { + return y.CompareKeys(s.tables[i].Smallest(), s.tables[j].Smallest()) < 0 + }) +} + func decrRefs(tables []*table.Table) error { for _, table := range tables { if err := table.DecrRef(); err != nil { diff --git a/stream_writer.go b/stream_writer.go index 08e178346..487468290 100644 --- a/stream_writer.go +++ b/stream_writer.go @@ -243,6 +243,11 @@ func (sw *StreamWriter) Flush() error { return err } + // Sort tables at the end. + for _, l := range sw.db.lc.levels { + l.sortTables() + } + // Now sync the directories, so all the files are registered. if sw.db.opt.ValueDir != sw.db.opt.Dir { if err := syncDir(sw.db.opt.ValueDir); err != nil { @@ -449,9 +454,11 @@ func (w *sortedWriter) createTable(builder *table.Builder) error { if err := w.db.manifest.addChanges([]*pb.ManifestChange{change}); err != nil { return err } - if err := lhandler.replaceTables([]*table.Table{}, []*table.Table{tbl}); err != nil { - return err - } + + // We are not calling lhandler.replaceTables() here, as it sorts tables on every addition. + // We can sort all tables only once during Flush() call. + lhandler.addTable(tbl) + // Release the ref held by OpenTable. _ = tbl.DecrRef() w.db.opt.Infof("Table created: %d at level: %d for stream: %d. Size: %s\n",