Skip to content

Commit

Permalink
upload boltdb files from shipper only when they are not expected to b…
Browse files Browse the repository at this point in the history
…e modified or during shutdown (#2487)

* upload boltdb files from shipper only when they are not expected to be modified or during shutdown

* updated comment

* add one more test condition to make sure files are not re-uploaded

Co-authored-by: Edward Welch <[email protected]>
  • Loading branch information
sandeepsukhani and slim-bean authored Aug 24, 2020
1 parent b4c6065 commit 9b0740c
Show file tree
Hide file tree
Showing 3 changed files with 149 additions and 14 deletions.
31 changes: 28 additions & 3 deletions pkg/storage/stores/shipper/uploads/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"os"
"path"
"path/filepath"
"regexp"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -55,6 +56,7 @@ type Table struct {

uploadedDBsMtime map[string]time.Time
uploadedDBsMtimeMtx sync.RWMutex
modifyShardsSince int64
}

// NewTable create a new Table without looking for any existing local dbs belonging to the table.
Expand Down Expand Up @@ -90,6 +92,7 @@ func newTableWithDBs(dbs map[string]*bbolt.DB, path, uploader string, storageCli
boltdbIndexClient: boltdbIndexClient,
dbs: dbs,
uploadedDBsMtime: map[string]time.Time{},
modifyShardsSince: time.Now().Unix(),
}, nil
}

Expand Down Expand Up @@ -140,9 +143,13 @@ func (lt *Table) Write(ctx context.Context, writes local.TableWrites) error {
// db files are named after the time shard i.e epoch of the truncated time.
// If a db file does not exist for a shard it gets created.
func (lt *Table) write(ctx context.Context, tm time.Time, writes local.TableWrites) error {
shard := fmt.Sprint(tm.Truncate(shardDBsByDuration).Unix())
// do not write to files older than init time otherwise we might endup modifying file which was already created and uploaded before last shutdown.
shard := tm.Truncate(shardDBsByDuration).Unix()
if shard < lt.modifyShardsSince {
shard = lt.modifyShardsSince
}

db, err := lt.getOrAddDB(shard)
db, err := lt.getOrAddDB(fmt.Sprint(shard))
if err != nil {
return err
}
Expand Down Expand Up @@ -185,13 +192,31 @@ func (lt *Table) RemoveDB(name string) error {
}

// Upload uploads all the dbs which are never uploaded or have been modified since the last batch was uploaded.
func (lt *Table) Upload(ctx context.Context) error {
func (lt *Table) Upload(ctx context.Context, force bool) error {
lt.dbsMtx.RLock()
defer lt.dbsMtx.RUnlock()

// upload files excluding active shard. It could so happen that we just started a new shard but the file for last shard is still being updated due to pending writes or pending flush to disk.
// To avoid uploading it, excluding previous active shard as well if it has been not more than a minute since it became inactive.
uploadShardsBefore := fmt.Sprint(time.Now().Add(-time.Minute).Truncate(shardDBsByDuration).Unix())

// Adding check for considering only files which are sharded and have just an epoch in their name.
// Before introducing sharding we had a single file per table which were were moved inside the folder per table as part of migration.
// The files were named with <table_prefix><period>.
// Since sharding was introduced we have a new file every 15 mins and their names just include an epoch timestamp, for e.g `1597927538`.
// We can remove this check after we no longer support upgrading from 1.5.0.
filenameWithEpochRe, err := regexp.Compile(`^[0-9]{10}$`)
if err != nil {
return err
}

level.Info(util.Logger).Log("msg", fmt.Sprintf("uploading table %s", lt.name))

for name, db := range lt.dbs {
// doing string comparison between unix timestamps in string form since they are anyways of same length
if !force && filenameWithEpochRe.MatchString(name) && name >= uploadShardsBefore {
continue
}
stat, err := os.Stat(db.Path())
if err != nil {
return err
Expand Down
8 changes: 4 additions & 4 deletions pkg/storage/stores/shipper/uploads/table_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (tm *TableManager) loop() {
for {
select {
case <-syncTicker.C:
tm.uploadTables(context.Background())
tm.uploadTables(context.Background(), false)
case <-tm.ctx.Done():
return
}
Expand All @@ -84,7 +84,7 @@ func (tm *TableManager) Stop() {
tm.cancel()
tm.wg.Wait()

tm.uploadTables(context.Background())
tm.uploadTables(context.Background(), true)
}

func (tm *TableManager) QueryPages(ctx context.Context, queries []chunk.IndexQuery, callback chunk_util.Callback) error {
Expand Down Expand Up @@ -151,15 +151,15 @@ func (tm *TableManager) getOrCreateTable(tableName string) (*Table, error) {
return table, nil
}

func (tm *TableManager) uploadTables(ctx context.Context) {
func (tm *TableManager) uploadTables(ctx context.Context, force bool) {
tm.tablesMtx.RLock()
defer tm.tablesMtx.RUnlock()

level.Info(pkg_util.Logger).Log("msg", "uploading tables")

status := statusSuccess
for _, table := range tm.tables {
err := table.Upload(ctx)
err := table.Upload(ctx, force)
if err != nil {
// continue uploading other tables while skipping cleanup for a failed one.
status = statusFailure
Expand Down
124 changes: 117 additions & 7 deletions pkg/storage/stores/shipper/uploads/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,22 +101,46 @@ func TestTable_Write(t *testing.T) {
}()

now := time.Now()

// allow modifying last 5 shards
table.modifyShardsSince = now.Add(-5 * shardDBsByDuration).Unix()

// a couple of times for which we want to do writes to make the table create different shards
writeTimes := []time.Time{now, now.Add(-(shardDBsByDuration + 5*time.Minute)), now.Add(-(shardDBsByDuration*3 + 3*time.Minute))}
testCases := []struct {
writeTime time.Time
dbName string // set only when it is supposed to be written to a different name than usual
}{
{
writeTime: now,
},
{
writeTime: now.Add(-(shardDBsByDuration + 5*time.Minute)),
},
{
writeTime: now.Add(-(shardDBsByDuration*3 + 3*time.Minute)),
},
{
writeTime: now.Add(-6 * shardDBsByDuration), // write with time older than table.modifyShardsSince
dbName: fmt.Sprint(table.modifyShardsSince),
},
}

numFiles := 0

// performing writes and checking whether the index gets written to right shard
for i, tm := range writeTimes {
for i, tc := range testCases {
t.Run(fmt.Sprint(i), func(t *testing.T) {
batch := boltIndexClient.NewWriteBatch()
testutil.AddRecordsToBatch(batch, "test", i*10, 10)
require.NoError(t, table.write(context.Background(), tm, batch.(*local.BoltWriteBatch).Writes["test"]))
require.NoError(t, table.write(context.Background(), tc.writeTime, batch.(*local.BoltWriteBatch).Writes["test"]))

numFiles++
require.Equal(t, numFiles, len(table.dbs))

expectedDBName := fmt.Sprint(tm.Truncate(shardDBsByDuration).Unix())
expectedDBName := tc.dbName
if expectedDBName == "" {
expectedDBName = fmt.Sprint(tc.writeTime.Truncate(shardDBsByDuration).Unix())
}
db, ok := table.dbs[expectedDBName]
require.True(t, ok)

Expand Down Expand Up @@ -147,7 +171,7 @@ func TestTable_Upload(t *testing.T) {
require.NoError(t, table.write(context.Background(), now, batch.(*local.BoltWriteBatch).Writes["test"]))

// upload the table
require.NoError(t, table.Upload(context.Background()))
require.NoError(t, table.Upload(context.Background(), true))
require.Len(t, table.dbs, 1)

// compare the local dbs for the table with the dbs in remote storage after upload to ensure they have same data
Expand All @@ -168,7 +192,7 @@ func TestTable_Upload(t *testing.T) {
require.NoError(t, table.write(context.Background(), now.Add(shardDBsByDuration), batch.(*local.BoltWriteBatch).Writes["test"]))

// upload the dbs to storage
require.NoError(t, table.Upload(context.Background()))
require.NoError(t, table.Upload(context.Background(), true))
require.Len(t, table.dbs, 2)

// check local dbs with remote dbs to ensure they have same data
Expand Down Expand Up @@ -226,7 +250,7 @@ func TestTable_Cleanup(t *testing.T) {
}()

// upload all the existing dbs
require.NoError(t, table.Upload(context.Background()))
require.NoError(t, table.Upload(context.Background(), true))
require.Len(t, table.uploadedDBsMtime, 3)

// change the mtime of outsideRetentionButModified db after the upload
Expand Down Expand Up @@ -305,3 +329,89 @@ func Test_LoadBoltDBsFromDir(t *testing.T) {
require.NoError(t, boltdb.Close())
}
}

func TestTable_ImmutableUploads(t *testing.T) {
tempDir, err := ioutil.TempDir("", "table-writes")
require.NoError(t, err)

defer func() {
require.NoError(t, os.RemoveAll(tempDir))
}()

boltDBIndexClient, storageClient := buildTestClients(t, tempDir)
indexPath := filepath.Join(tempDir, indexDirName)

defer func() {
boltDBIndexClient.Stop()
}()

activeShard := time.Now().Truncate(shardDBsByDuration)

// some dbs to setup
dbNames := []int64{
activeShard.Add(-shardDBsByDuration).Unix(), // inactive shard, should upload
activeShard.Add(-2 * time.Minute).Unix(), // 2 minutes before active shard, should upload
activeShard.Unix(), // active shard, should not upload
}

dbs := map[string]testutil.DBRecords{}
for _, dbName := range dbNames {
dbs[fmt.Sprint(dbName)] = testutil.DBRecords{
NumRecords: 10,
}
}

// setup some dbs for a table at a path.
tablePath := testutil.SetupDBTablesAtPath(t, "test-table", indexPath, dbs)

table, err := LoadTable(tablePath, "test", storageClient, boltDBIndexClient)
require.NoError(t, err)
require.NotNil(t, table)

defer func() {
table.Stop()
}()

// db expected to be uploaded without forcing the upload
expectedDBsToUpload := []int64{dbNames[0], dbNames[1]}

// upload dbs without forcing the upload which should not upload active shard or shard which has been active upto a minute back.
require.NoError(t, table.Upload(context.Background(), false))

// verify that only expected dbs are uploaded
objectStorageDir := filepath.Join(tempDir, objectsStorageDirName)
uploadedDBs, err := ioutil.ReadDir(filepath.Join(objectStorageDir, table.name))
require.NoError(t, err)

require.Len(t, uploadedDBs, len(expectedDBsToUpload))
for _, expectedDB := range expectedDBsToUpload {
require.FileExists(t, filepath.Join(objectStorageDir, table.buildObjectKey(fmt.Sprint(expectedDB))))
}

// force upload of dbs
require.NoError(t, table.Upload(context.Background(), true))
expectedDBsToUpload = dbNames

// verify that all the dbs are uploaded
uploadedDBs, err = ioutil.ReadDir(filepath.Join(objectStorageDir, table.name))
require.NoError(t, err)

require.Len(t, uploadedDBs, len(expectedDBsToUpload))
for _, expectedDB := range expectedDBsToUpload {
require.FileExists(t, filepath.Join(objectStorageDir, table.buildObjectKey(fmt.Sprint(expectedDB))))
}

// delete everything uploaded
dir, err := ioutil.ReadDir(filepath.Join(objectStorageDir, table.name))
for _, d := range dir {
os.RemoveAll(filepath.Join(objectStorageDir, table.name, d.Name()))
}

// force upload of dbs
require.NoError(t, table.Upload(context.Background(), true))

// make sure nothing was re-uploaded
for _, expectedDB := range expectedDBsToUpload {
require.NoFileExists(t, filepath.Join(objectStorageDir, table.buildObjectKey(fmt.Sprint(expectedDB))))
}
}

0 comments on commit 9b0740c

Please sign in to comment.