Skip to content

Commit

Permalink
recreate compacted boltdb files from compactor when they are more tha…
Browse files Browse the repository at this point in the history
…n 12 hours old (#4853)
  • Loading branch information
sandeepsukhani authored Dec 2, 2021
1 parent 6347faf commit 3ac9818
Show file tree
Hide file tree
Showing 3 changed files with 272 additions and 40 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
* [4736](https://github.com/grafana/loki/pull/4736) **sandeepsukhani**: allow applying retention at different interval than compaction
* [4744](https://github.com/grafana/loki/pull/4744) **cyriltovena**: Promtail: Adds GELF UDP support.
* [4741](https://github.com/grafana/loki/pull/4741) **sandeepsukhani**: index cleanup fixes while applying retention
* [4853](https://github.com/grafana/loki/pull/4853) **sandeepsukhani**: recreate compacted boltdb files from compactor to reduce storage space usage

# 2.4.1 (2021/11/07)

Expand Down
162 changes: 124 additions & 38 deletions pkg/storage/stores/shipper/compactor/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,16 @@ import (
)

const (
compactMinDBs = 4
uploaderName = "compactor"
uploaderName = "compactor"

readDBsParallelism = 50
batchSize = 1000

// we want to recreate compactedDB when the chances of it changing due to compaction or deletion of data are low.
// this is to avoid recreation of the DB too often which would be too costly in a large cluster.
recreateCompactedDBOlderThan = 12 * time.Hour
dropFreePagesTxMaxSize = 100 * 1024 * 1024 // 100MB
recreatedCompactedDBSuffix = ".r.gz"
)

var bucketName = []byte("index")
Expand All @@ -42,8 +47,12 @@ type table struct {
applyRetention bool
tableMarker retention.TableMarker

compactedDB *bbolt.DB
logger log.Logger
sourceFiles []storage.IndexFile
compactedDB *bbolt.DB
compactedDBRecreated bool
uploadCompactedDB bool
removeSourceFiles bool
logger log.Logger

ctx context.Context
quit chan struct{}
Expand Down Expand Up @@ -75,6 +84,12 @@ func (t *table) compact(tableHasExpiredStreams bool) error {
return err
}

if len(indexFiles) == 0 {
level.Info(t.logger).Log("msg", "no index files found")
return nil
}

t.sourceFiles = indexFiles
level.Info(t.logger).Log("msg", "listed files", "count", len(indexFiles))

defer func() {
Expand All @@ -86,74 +101,141 @@ func (t *table) compact(tableHasExpiredStreams bool) error {

applyRetention := t.applyRetention && tableHasExpiredStreams

if !applyRetention {
if len(indexFiles) < compactMinDBs {
level.Info(t.logger).Log("msg", fmt.Sprintf("skipping compaction since we have just %d files in storage", len(indexFiles)))
return nil
}
if len(indexFiles) > 1 {
if err := t.compactFiles(indexFiles); err != nil {
return err
}
// upload the compacted db
err = t.upload()

// we have compacted the files to a single file so let use upload the compacted db and remove the source files.
t.uploadCompactedDB = true
t.removeSourceFiles = true
} else if !applyRetention && !t.mustRecreateCompactedDB() {
return nil
} else {
// download the db for applying retention or recreating the compacted db
downloadAt := filepath.Join(t.workingDirectory, indexFiles[0].Name)
err = shipper_util.GetFileFromStorage(t.ctx, t.indexStorageClient, t.name, indexFiles[0].Name, downloadAt, false)
if err != nil {
return err
}
t.compactedDB, err = openBoltdbFileWithNoSync(downloadAt)
if err != nil {
return err
}
}

// remove source files from storage which were compacted
err = t.removeFilesFromStorage(indexFiles)
if applyRetention {
empty, modified, err := t.tableMarker.MarkForDelete(t.ctx, t.name, t.compactedDB)
if err != nil {
return err
}
return nil

if empty {
// we have deleted all the data so we can remove the source files without uploading the compacted db
t.removeSourceFiles = true
t.uploadCompactedDB = false
} else if modified {
// we have modified the compacted db so we need to upload the compacted db and remove the source file(s)
t.uploadCompactedDB = true
t.removeSourceFiles = true
}
}

var compacted bool
if len(indexFiles) > 1 {
if err := t.compactFiles(indexFiles); err != nil {
// file was not modified so see if we must recreate the compacted db to optimize storage usage
if !t.uploadCompactedDB && !t.removeSourceFiles && t.mustRecreateCompactedDB() {
err := t.recreateCompactedDB()
if err != nil {
return err
}
compacted = true

// we have recreated the compacted db so we need to upload the compacted db and remove the source file
t.uploadCompactedDB = true
t.removeSourceFiles = true
t.compactedDBRecreated = true
}

if len(indexFiles) == 1 {
// download the db
downloadAt := filepath.Join(t.workingDirectory, fmt.Sprint(time.Now().Unix()))
err = shipper_util.GetFileFromStorage(t.ctx, t.indexStorageClient, t.name, indexFiles[0].Name, downloadAt, false)
return t.done()
}

// done takes care of uploading the files and cleaning up the working directory based on the value in uploadCompactedDB and removeSourceFiles
func (t *table) done() error {
if t.uploadCompactedDB {
err := t.upload()
if err != nil {
return err
}
t.compactedDB, err = openBoltdbFileWithNoSync(downloadAt)
}

if t.removeSourceFiles {
err := t.removeSourceFilesFromStorage()
if err != nil {
return err
}
}

if t.compactedDB == nil {
level.Info(t.logger).Log("msg", "skipping compaction no files found.")
return nil
return nil
}

// mustRecreateCompactedDB returns true if the compacted db should be recreated
func (t *table) mustRecreateCompactedDB() bool {
if len(t.sourceFiles) != 1 {
// do not recreate if there are multiple source files
return false
} else if time.Since(t.sourceFiles[0].ModifiedAt) < recreateCompactedDBOlderThan {
// do not recreate if the source file is younger than the threshold
return false
}

empty, modified, err := t.tableMarker.MarkForDelete(t.ctx, t.name, t.compactedDB)
// recreate the compacted db only if we have not recreated it before
return !strings.HasSuffix(t.sourceFiles[0].Name, recreatedCompactedDBSuffix)
}

// recreateCompactedDB just copies the old db to the new one using bbolt.Compact for following reasons:
// 1. When files are deleted, boltdb leaves free pages in the file. The only way to drop those free pages is to re-create the file.
// See https://github.com/boltdb/bolt/issues/308 for more details.
// 2. boltdb by default fills only about 50% of the page in the file. See https://github.com/etcd-io/bbolt/blob/master/bucket.go#L26.
// This setting is optimal for unordered writes.
// bbolt.Compact fills the whole page by setting FillPercent to 1 which works well here since while copying the data, it receives the index entries in order.
// The storage space goes down from anywhere between 25% to 50% as per my(Sandeep) tests.
func (t *table) recreateCompactedDB() error {
destDB, err := openBoltdbFileWithNoSync(filepath.Join(t.workingDirectory, fmt.Sprint(time.Now().Unix())))
if err != nil {
return err
}

if empty {
return t.removeFilesFromStorage(indexFiles)
level.Info(t.logger).Log("msg", "recreating compacted db")

err = bbolt.Compact(destDB, t.compactedDB, dropFreePagesTxMaxSize)
if err != nil {
return err
}

if !modified && !compacted {
// we didn't make a modification so let's just return
sourceSize := int64(0)
destSize := int64(0)

if err := t.compactedDB.View(func(tx *bbolt.Tx) error {
sourceSize = tx.Size()
return nil
}); err != nil {
return err
}

err = t.upload()
if err := destDB.View(func(tx *bbolt.Tx) error {
destSize = tx.Size()
return nil
}); err != nil {
return err
}

level.Info(t.logger).Log("msg", "recreated compacted db", "src_size_bytes", sourceSize, "dest_size_bytes", destSize)

err = t.compactedDB.Close()
if err != nil {
return err
}

return t.removeFilesFromStorage(indexFiles)
t.compactedDB = destDB
return nil
}

func (t *table) compactFiles(files []storage.IndexFile) error {
Expand Down Expand Up @@ -388,17 +470,21 @@ func (t *table) upload() error {
}
}()

fileName := fmt.Sprintf("%s.gz", shipper_util.BuildIndexFileName(t.name, uploaderName, fmt.Sprint(time.Now().Unix())))
fileNameFormat := "%s.gz"
if t.compactedDBRecreated {
fileNameFormat = "%s" + recreatedCompactedDBSuffix
}
fileName := fmt.Sprintf(fileNameFormat, shipper_util.BuildIndexFileName(t.name, uploaderName, fmt.Sprint(time.Now().Unix())))
level.Info(t.logger).Log("msg", "uploading the compacted file", "fileName", fileName)

return t.indexStorageClient.PutFile(t.ctx, t.name, fileName, compressedDB)
}

// removeFilesFromStorage deletes index files from storage.
func (t *table) removeFilesFromStorage(files []storage.IndexFile) error {
level.Info(t.logger).Log("msg", "removing source db files from storage", "count", len(files))
// removeSourceFilesFromStorage deletes source db files from storage.
func (t *table) removeSourceFilesFromStorage() error {
level.Info(t.logger).Log("msg", "removing source db files from storage", "count", len(t.sourceFiles))

for _, file := range files {
for _, file := range t.sourceFiles {
err := t.indexStorageClient.DeleteFile(t.ctx, t.name, file.Name)
if err != nil {
return err
Expand Down
Loading

0 comments on commit 3ac9818

Please sign in to comment.