From 9b0740c0ed9338be9a9127a3e15d758a949a4e17 Mon Sep 17 00:00:00 2001 From: Sandeep Sukhani Date: Mon, 24 Aug 2020 17:21:45 +0530 Subject: [PATCH] upload boltdb files from shipper only when they are not expected to be modified or during shutdown (#2487) * upload boltdb files from shipper only when they are not expected to be modified or during shutdown * updated comment * add one more test condition to make sure files are not re-uploaded Co-authored-by: Edward Welch --- pkg/storage/stores/shipper/uploads/table.go | 31 ++++- .../stores/shipper/uploads/table_manager.go | 8 +- .../stores/shipper/uploads/table_test.go | 124 +++++++++++++++++- 3 files changed, 149 insertions(+), 14 deletions(-) diff --git a/pkg/storage/stores/shipper/uploads/table.go b/pkg/storage/stores/shipper/uploads/table.go index fa02701e3dfc6..8724adc4394df 100644 --- a/pkg/storage/stores/shipper/uploads/table.go +++ b/pkg/storage/stores/shipper/uploads/table.go @@ -8,6 +8,7 @@ import ( "os" "path" "path/filepath" + "regexp" "strings" "sync" "time" @@ -55,6 +56,7 @@ type Table struct { uploadedDBsMtime map[string]time.Time uploadedDBsMtimeMtx sync.RWMutex + modifyShardsSince int64 } // NewTable create a new Table without looking for any existing local dbs belonging to the table. @@ -90,6 +92,7 @@ func newTableWithDBs(dbs map[string]*bbolt.DB, path, uploader string, storageCli boltdbIndexClient: boltdbIndexClient, dbs: dbs, uploadedDBsMtime: map[string]time.Time{}, + modifyShardsSince: time.Now().Unix(), }, nil } @@ -140,9 +143,13 @@ func (lt *Table) Write(ctx context.Context, writes local.TableWrites) error { // db files are named after the time shard i.e epoch of the truncated time. // If a db file does not exist for a shard it gets created. func (lt *Table) write(ctx context.Context, tm time.Time, writes local.TableWrites) error { - shard := fmt.Sprint(tm.Truncate(shardDBsByDuration).Unix()) + // do not write to files older than init time otherwise we might endup modifying file which was already created and uploaded before last shutdown. + shard := tm.Truncate(shardDBsByDuration).Unix() + if shard < lt.modifyShardsSince { + shard = lt.modifyShardsSince + } - db, err := lt.getOrAddDB(shard) + db, err := lt.getOrAddDB(fmt.Sprint(shard)) if err != nil { return err } @@ -185,13 +192,31 @@ func (lt *Table) RemoveDB(name string) error { } // Upload uploads all the dbs which are never uploaded or have been modified since the last batch was uploaded. -func (lt *Table) Upload(ctx context.Context) error { +func (lt *Table) Upload(ctx context.Context, force bool) error { lt.dbsMtx.RLock() defer lt.dbsMtx.RUnlock() + // upload files excluding active shard. It could so happen that we just started a new shard but the file for last shard is still being updated due to pending writes or pending flush to disk. + // To avoid uploading it, excluding previous active shard as well if it has been not more than a minute since it became inactive. + uploadShardsBefore := fmt.Sprint(time.Now().Add(-time.Minute).Truncate(shardDBsByDuration).Unix()) + + // Adding check for considering only files which are sharded and have just an epoch in their name. + // Before introducing sharding we had a single file per table which were were moved inside the folder per table as part of migration. + // The files were named with . + // Since sharding was introduced we have a new file every 15 mins and their names just include an epoch timestamp, for e.g `1597927538`. + // We can remove this check after we no longer support upgrading from 1.5.0. + filenameWithEpochRe, err := regexp.Compile(`^[0-9]{10}$`) + if err != nil { + return err + } + level.Info(util.Logger).Log("msg", fmt.Sprintf("uploading table %s", lt.name)) for name, db := range lt.dbs { + // doing string comparison between unix timestamps in string form since they are anyways of same length + if !force && filenameWithEpochRe.MatchString(name) && name >= uploadShardsBefore { + continue + } stat, err := os.Stat(db.Path()) if err != nil { return err diff --git a/pkg/storage/stores/shipper/uploads/table_manager.go b/pkg/storage/stores/shipper/uploads/table_manager.go index 5d02382908dbb..cdc2425a7e8f5 100644 --- a/pkg/storage/stores/shipper/uploads/table_manager.go +++ b/pkg/storage/stores/shipper/uploads/table_manager.go @@ -71,7 +71,7 @@ func (tm *TableManager) loop() { for { select { case <-syncTicker.C: - tm.uploadTables(context.Background()) + tm.uploadTables(context.Background(), false) case <-tm.ctx.Done(): return } @@ -84,7 +84,7 @@ func (tm *TableManager) Stop() { tm.cancel() tm.wg.Wait() - tm.uploadTables(context.Background()) + tm.uploadTables(context.Background(), true) } func (tm *TableManager) QueryPages(ctx context.Context, queries []chunk.IndexQuery, callback chunk_util.Callback) error { @@ -151,7 +151,7 @@ func (tm *TableManager) getOrCreateTable(tableName string) (*Table, error) { return table, nil } -func (tm *TableManager) uploadTables(ctx context.Context) { +func (tm *TableManager) uploadTables(ctx context.Context, force bool) { tm.tablesMtx.RLock() defer tm.tablesMtx.RUnlock() @@ -159,7 +159,7 @@ func (tm *TableManager) uploadTables(ctx context.Context) { status := statusSuccess for _, table := range tm.tables { - err := table.Upload(ctx) + err := table.Upload(ctx, force) if err != nil { // continue uploading other tables while skipping cleanup for a failed one. status = statusFailure diff --git a/pkg/storage/stores/shipper/uploads/table_test.go b/pkg/storage/stores/shipper/uploads/table_test.go index 0c45445dcaeb1..cd22b52fefb85 100644 --- a/pkg/storage/stores/shipper/uploads/table_test.go +++ b/pkg/storage/stores/shipper/uploads/table_test.go @@ -101,22 +101,46 @@ func TestTable_Write(t *testing.T) { }() now := time.Now() + + // allow modifying last 5 shards + table.modifyShardsSince = now.Add(-5 * shardDBsByDuration).Unix() + // a couple of times for which we want to do writes to make the table create different shards - writeTimes := []time.Time{now, now.Add(-(shardDBsByDuration + 5*time.Minute)), now.Add(-(shardDBsByDuration*3 + 3*time.Minute))} + testCases := []struct { + writeTime time.Time + dbName string // set only when it is supposed to be written to a different name than usual + }{ + { + writeTime: now, + }, + { + writeTime: now.Add(-(shardDBsByDuration + 5*time.Minute)), + }, + { + writeTime: now.Add(-(shardDBsByDuration*3 + 3*time.Minute)), + }, + { + writeTime: now.Add(-6 * shardDBsByDuration), // write with time older than table.modifyShardsSince + dbName: fmt.Sprint(table.modifyShardsSince), + }, + } numFiles := 0 // performing writes and checking whether the index gets written to right shard - for i, tm := range writeTimes { + for i, tc := range testCases { t.Run(fmt.Sprint(i), func(t *testing.T) { batch := boltIndexClient.NewWriteBatch() testutil.AddRecordsToBatch(batch, "test", i*10, 10) - require.NoError(t, table.write(context.Background(), tm, batch.(*local.BoltWriteBatch).Writes["test"])) + require.NoError(t, table.write(context.Background(), tc.writeTime, batch.(*local.BoltWriteBatch).Writes["test"])) numFiles++ require.Equal(t, numFiles, len(table.dbs)) - expectedDBName := fmt.Sprint(tm.Truncate(shardDBsByDuration).Unix()) + expectedDBName := tc.dbName + if expectedDBName == "" { + expectedDBName = fmt.Sprint(tc.writeTime.Truncate(shardDBsByDuration).Unix()) + } db, ok := table.dbs[expectedDBName] require.True(t, ok) @@ -147,7 +171,7 @@ func TestTable_Upload(t *testing.T) { require.NoError(t, table.write(context.Background(), now, batch.(*local.BoltWriteBatch).Writes["test"])) // upload the table - require.NoError(t, table.Upload(context.Background())) + require.NoError(t, table.Upload(context.Background(), true)) require.Len(t, table.dbs, 1) // compare the local dbs for the table with the dbs in remote storage after upload to ensure they have same data @@ -168,7 +192,7 @@ func TestTable_Upload(t *testing.T) { require.NoError(t, table.write(context.Background(), now.Add(shardDBsByDuration), batch.(*local.BoltWriteBatch).Writes["test"])) // upload the dbs to storage - require.NoError(t, table.Upload(context.Background())) + require.NoError(t, table.Upload(context.Background(), true)) require.Len(t, table.dbs, 2) // check local dbs with remote dbs to ensure they have same data @@ -226,7 +250,7 @@ func TestTable_Cleanup(t *testing.T) { }() // upload all the existing dbs - require.NoError(t, table.Upload(context.Background())) + require.NoError(t, table.Upload(context.Background(), true)) require.Len(t, table.uploadedDBsMtime, 3) // change the mtime of outsideRetentionButModified db after the upload @@ -305,3 +329,89 @@ func Test_LoadBoltDBsFromDir(t *testing.T) { require.NoError(t, boltdb.Close()) } } + +func TestTable_ImmutableUploads(t *testing.T) { + tempDir, err := ioutil.TempDir("", "table-writes") + require.NoError(t, err) + + defer func() { + require.NoError(t, os.RemoveAll(tempDir)) + }() + + boltDBIndexClient, storageClient := buildTestClients(t, tempDir) + indexPath := filepath.Join(tempDir, indexDirName) + + defer func() { + boltDBIndexClient.Stop() + }() + + activeShard := time.Now().Truncate(shardDBsByDuration) + + // some dbs to setup + dbNames := []int64{ + activeShard.Add(-shardDBsByDuration).Unix(), // inactive shard, should upload + activeShard.Add(-2 * time.Minute).Unix(), // 2 minutes before active shard, should upload + activeShard.Unix(), // active shard, should not upload + } + + dbs := map[string]testutil.DBRecords{} + for _, dbName := range dbNames { + dbs[fmt.Sprint(dbName)] = testutil.DBRecords{ + NumRecords: 10, + } + } + + // setup some dbs for a table at a path. + tablePath := testutil.SetupDBTablesAtPath(t, "test-table", indexPath, dbs) + + table, err := LoadTable(tablePath, "test", storageClient, boltDBIndexClient) + require.NoError(t, err) + require.NotNil(t, table) + + defer func() { + table.Stop() + }() + + // db expected to be uploaded without forcing the upload + expectedDBsToUpload := []int64{dbNames[0], dbNames[1]} + + // upload dbs without forcing the upload which should not upload active shard or shard which has been active upto a minute back. + require.NoError(t, table.Upload(context.Background(), false)) + + // verify that only expected dbs are uploaded + objectStorageDir := filepath.Join(tempDir, objectsStorageDirName) + uploadedDBs, err := ioutil.ReadDir(filepath.Join(objectStorageDir, table.name)) + require.NoError(t, err) + + require.Len(t, uploadedDBs, len(expectedDBsToUpload)) + for _, expectedDB := range expectedDBsToUpload { + require.FileExists(t, filepath.Join(objectStorageDir, table.buildObjectKey(fmt.Sprint(expectedDB)))) + } + + // force upload of dbs + require.NoError(t, table.Upload(context.Background(), true)) + expectedDBsToUpload = dbNames + + // verify that all the dbs are uploaded + uploadedDBs, err = ioutil.ReadDir(filepath.Join(objectStorageDir, table.name)) + require.NoError(t, err) + + require.Len(t, uploadedDBs, len(expectedDBsToUpload)) + for _, expectedDB := range expectedDBsToUpload { + require.FileExists(t, filepath.Join(objectStorageDir, table.buildObjectKey(fmt.Sprint(expectedDB)))) + } + + // delete everything uploaded + dir, err := ioutil.ReadDir(filepath.Join(objectStorageDir, table.name)) + for _, d := range dir { + os.RemoveAll(filepath.Join(objectStorageDir, table.name, d.Name())) + } + + // force upload of dbs + require.NoError(t, table.Upload(context.Background(), true)) + + // make sure nothing was re-uploaded + for _, expectedDB := range expectedDBsToUpload { + require.NoFileExists(t, filepath.Join(objectStorageDir, table.buildObjectKey(fmt.Sprint(expectedDB)))) + } +}