Skip to content

Commit

Permalink
Merge #93324
Browse files Browse the repository at this point in the history
93324: ccl/backupccl: add memory monitor to external SST iterators in restore r=lidorcarmel a=rhu713

Previously, there was no limit on the amount of memory that can be used while
constructing edternal SST iterators during restore. This patch adds a memory
monitor to limit the amount of memory that can be used to construct external
SST iterators. If a restore processor fails to acquire enough memory to open
the next file for a restore span, it will send the iterator for all of the open
files it has accumulated so far, and wait until it can acquire the memory to
resume constructing the iterator for the remaining files.

The memory limit can be controlled via the new cluster setting
`bulkio.restore.per_processor_memory_limit`. Regardless of the setting,
however, the amount of memory used will not exceed
`COCKROACH_RESTORE_MEM_FRACTION` * `max SQL memory.` The new environment
variable `COCKROACH_RESTORE_MEM_FRACTION` defaults to 0.5.

Benchmarking using 10 iterators of 100 files each, each file is 24MiB in size.
```
./dev bench ./pkg/ccl/backupccl/  --filter BenchmarkIteratorMemory/fileCount=100$/iterCount=10$/rows=200000$/enc=false  --bench-mem -- --test_env=COCKROACH_GCS_SST_DIR=gs://rui-backup-test/bench --test_env=COCKROACH_S3_SST_DIR=s3://rui-crl/bench (plus credentials environment variables...)

GCS unencrypted: 5.0 MiB/file  
  5246162544 B/op

GCS encrypted: 8.9 MiB/file
  9404394496 B/op

S3 unencrypted: 1.4 MiB/file 
  1511477488 B/op

S3 encrypted: 1.7 MiB/file
  1834325232 B/op
```

Fixes: #102722 

Release note: None


Co-authored-by: Rui Hu <[email protected]>
  • Loading branch information
craig[bot] and Rui Hu committed May 11, 2023
2 parents 7a0fb5b + 84ed8ac commit f04439c
Show file tree
Hide file tree
Showing 20 changed files with 1,113 additions and 94 deletions.
2 changes: 2 additions & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ ALL_TESTS = [
"//pkg/ccl/backupccl/backupinfo:backupinfo_test",
"//pkg/ccl/backupccl/backuprand:backuprand_test",
"//pkg/ccl/backupccl/backupresolver:backupresolver_test",
"//pkg/ccl/backupccl/backuputils:backuputils_test",
"//pkg/ccl/backupccl:backupccl_test",
"//pkg/ccl/baseccl:baseccl_test",
"//pkg/ccl/benchccl/rttanalysisccl:rttanalysisccl_test",
Expand Down Expand Up @@ -746,6 +747,7 @@ GO_TARGETS = [
"//pkg/ccl/backupccl/backupresolver:backupresolver_test",
"//pkg/ccl/backupccl/backuptestutils:backuptestutils",
"//pkg/ccl/backupccl/backuputils:backuputils",
"//pkg/ccl/backupccl/backuputils:backuputils_test",
"//pkg/ccl/backupccl:backupccl",
"//pkg/ccl/backupccl:backupccl_test",
"//pkg/ccl/baseccl:baseccl",
Expand Down
4 changes: 4 additions & 0 deletions pkg/ccl/backupccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ go_library(
"//pkg/util/metric",
"//pkg/util/mon",
"//pkg/util/protoutil",
"//pkg/util/quotapool",
"//pkg/util/randutil",
"//pkg/util/retry",
"//pkg/util/span",
Expand Down Expand Up @@ -211,6 +212,7 @@ go_test(
"//pkg/ccl/backupccl/backupinfo",
"//pkg/ccl/backupccl/backuppb",
"//pkg/ccl/backupccl/backuptestutils",
"//pkg/ccl/backupccl/backuputils",
"//pkg/ccl/kvccl",
"//pkg/ccl/multiregionccl",
"//pkg/ccl/multiregionccl/multiregionccltestutils",
Expand Down Expand Up @@ -292,7 +294,9 @@ go_test(
"//pkg/util/admission",
"//pkg/util/ctxgroup",
"//pkg/util/encoding",
"//pkg/util/envutil",
"//pkg/util/hlc",
"//pkg/util/humanizeutil",
"//pkg/util/ioctx",
"//pkg/util/leaktest",
"//pkg/util/limit",
Expand Down
193 changes: 190 additions & 3 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1441,7 +1441,7 @@ func checkInProgressBackupRestore(
<-exportSpanCompleteCh
}
},
RunAfterProcessingRestoreSpanEntry: func(_ context.Context) {
RunAfterProcessingRestoreSpanEntry: func(_ context.Context, _ *execinfrapb.RestoreSpanEntry) {
<-allowResponse
},
},
Expand Down Expand Up @@ -1612,7 +1612,7 @@ func TestRestoreCheckpointing(t *testing.T) {
knobs := base.TestingKnobs{
DistSQL: &execinfra.TestingKnobs{
BackupRestoreTestingKnobs: &sql.BackupRestoreTestingKnobs{
RunAfterProcessingRestoreSpanEntry: func(_ context.Context) {
RunAfterProcessingRestoreSpanEntry: func(_ context.Context, _ *execinfrapb.RestoreSpanEntry) {
// Because the restore processor has several workers that
// concurrently send addsstable requests and because all workers will
// wait on the lock below, when one flush gets blocked on the
Expand Down Expand Up @@ -7407,7 +7407,7 @@ func TestClientDisconnect(t *testing.T) {

args := base.TestClusterArgs{}
knobs := base.TestingKnobs{
DistSQL: &execinfra.TestingKnobs{BackupRestoreTestingKnobs: &sql.BackupRestoreTestingKnobs{RunAfterProcessingRestoreSpanEntry: func(ctx context.Context) {
DistSQL: &execinfra.TestingKnobs{BackupRestoreTestingKnobs: &sql.BackupRestoreTestingKnobs{RunAfterProcessingRestoreSpanEntry: func(ctx context.Context, _ *execinfrapb.RestoreSpanEntry) {
blockBackupOrRestore(ctx)
}}},
Store: &kvserver.StoreTestingKnobs{
Expand Down Expand Up @@ -11090,3 +11090,190 @@ CREATE TABLE child_pk (k INT8 PRIMARY KEY REFERENCES parent);
sqlDB.Exec(t, `DROP DATABASE test`)
}
}

// Verify that during restore, if a restore span has too many files to fit in
// the memory budget with a single SST iterator, the restore processor should
// repeatedly open and process iterators for as many files as can fit within the
// budget until the span is finished.
func TestRestoreMemoryMonitoring(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
skip.UnderRace(t, "the largest tests are too slow to run under race")
skip.UnderStress(t, "the largest tests are too slow to run under stress")

const splitSize = 10
for _, numSplits := range []int{10, 100, 200} {
for _, numInc := range []int{0, 1, 3, 10} {
for _, restoreProcessorMaxFiles := range []int{5, 10, 20} {
t.Run(fmt.Sprintf("splits=%d/inc=%d/procMaxFiles=%d", numSplits, numInc, restoreProcessorMaxFiles), func(t *testing.T) {
numAccounts := numSplits * splitSize
var expectedNumFiles int
restoreProcessorKnobCount := atomic.Uint32{}
args := base.TestServerArgs{
DefaultTestTenant: base.TestTenantDisabled,
SQLMemoryPoolSize: 1 << 30, // Large enough for all mem limit settings.
Knobs: base.TestingKnobs{
DistSQL: &execinfra.TestingKnobs{
BackupRestoreTestingKnobs: &sql.BackupRestoreTestingKnobs{
RunAfterProcessingRestoreSpanEntry: func(ctx context.Context, entry *execinfrapb.RestoreSpanEntry) {
// The total size of the backup files should be less than the target
// SST size, thus should all fit in one import span.
require.Equal(t, expectedNumFiles, len(entry.Files))
restoreProcessorKnobCount.Add(1)
},
},
},
},
}
params := base.TestClusterArgs{ServerArgs: args}
_, sqlDB, _, cleanupFn := backupRestoreTestSetupWithParams(t, singleNode, numAccounts, InitManualReplication, params)
defer cleanupFn()

sqlDB.Exec(t, "SET CLUSTER SETTING bulkio.restore.memory_monitor_ssts=true")
sqlDB.Exec(t, "SET CLUSTER SETTING kv.bulk_io_write.restore_node_concurrency=2")

// Add some splits in the table, and set the target file size to be something
// small so that we get one flushed file per split in the backup.
sqlDB.Exec(t, "ALTER TABLE data.bank SPLIT AT SELECT generate_series($1::INT, $2, $3)", 0, numAccounts, splitSize)
sqlDB.Exec(t, "SET CLUSTER SETTING bulkio.backup.file_size = '1b'")
sqlDB.Exec(t, "BACKUP data.bank INTO 'userfile:///backup'")

// Take some incremental backups after mutating some rows. Take note of the
// splits that have been changed as that determines the number of incremental
// files that are created.
var numIncFiles int
for i := 0; i < numInc; i++ {
incSplitsWithFile := make(map[int]bool)

for n := 0; n < 100; n++ {
id := rand.Intn(numAccounts)
sqlDB.Exec(t, `UPDATE data.bank SET balance = balance + 1 WHERE id = $1`, id)
split := id / splitSize
incSplitsWithFile[split] = true
}

sqlDB.Exec(t, `BACKUP data.bank INTO latest IN 'userfile:///backup' WITH revision_history`)
numIncFiles += len(incSplitsWithFile)
}

expectedNumFiles += numSplits + numIncFiles
// Verify the file counts in the backup.
var numFiles int
sqlDB.QueryRow(t, "SELECT count(*) FROM [SHOW BACKUP FILES FROM latest IN 'userfile:///backup']").Scan(&numFiles)
require.Equal(t, expectedNumFiles, numFiles)

sqlDB.Exec(t, "SET CLUSTER SETTING bulkio.restore.per_processor_memory_limit = $1", restoreProcessorMaxFiles*sstReaderOverheadBytesPerFile)

sqlDB.Exec(t, "CREATE DATABASE data2")
sqlDB.Exec(t, "RESTORE data.bank FROM latest IN 'userfile:///backup' WITH OPTIONS (into_db='data2')")

// Assert that the restore processor is processing the same span multiple
// times, and the count is based on what's expected from the memory budget.
// The expected number is just the ceiling of expectedNumFiles/restoreProcessorMaxFiles.
require.Equal(t, (expectedNumFiles-1)/restoreProcessorMaxFiles+1, int(restoreProcessorKnobCount.Load()))

// Verify data in the restored table.
expectedFingerprints := sqlDB.QueryStr(t, "SHOW EXPERIMENTAL_FINGERPRINTS FROM TABLE data.bank")
actualFingerprints := sqlDB.QueryStr(t, "SHOW EXPERIMENTAL_FINGERPRINTS FROM TABLE data2.bank")
require.Equal(t, expectedFingerprints, actualFingerprints)
})
}
}
}
}

// Verify that restore with memory monitoring should be able to succeed with
// partial SST iterators that shadow previously written values.
func TestRestoreMemoryMonitoringWithShadowing(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

const numAccounts = 10
const numIncrementals = 10
const restoreProcessorMaxFiles = 5

restoreProcessorKnobCount := atomic.Uint32{}

args := base.TestServerArgs{
DefaultTestTenant: base.TestTenantDisabled,
Knobs: base.TestingKnobs{
DistSQL: &execinfra.TestingKnobs{
BackupRestoreTestingKnobs: &sql.BackupRestoreTestingKnobs{
RunAfterProcessingRestoreSpanEntry: func(ctx context.Context, entry *execinfrapb.RestoreSpanEntry) {
restoreProcessorKnobCount.Add(1)
},
},
},
},
}
params := base.TestClusterArgs{ServerArgs: args}
_, sqlDB, _, cleanupFn := backupRestoreTestSetupWithParams(t, singleNode, numAccounts, InitManualReplication, params)
defer cleanupFn()

sqlDB.Exec(t, "SET CLUSTER SETTING kv.bulk_io_write.restore_node_concurrency = 1")
sqlDB.Exec(t, "SET CLUSTER SETTING bulkio.restore.memory_monitor_ssts=true")
sqlDB.Exec(t, "BACKUP data.bank INTO 'userfile:///backup'")

// Repeatedly alter a single row and do an incremental backup.
for i := 0; i < numIncrementals; i++ {
sqlDB.Exec(t, `UPDATE data.bank SET balance = $1 WHERE id = $2`, 1000+i, i)
sqlDB.Exec(t, "BACKUP data.bank INTO latest IN 'userfile:///backup'")
}

// Set the memory budget for the restore processor to be enough to open 5
// files.
sqlDB.Exec(t, "SET CLUSTER SETTING bulkio.restore.per_processor_memory_limit = $1", restoreProcessorMaxFiles*sstReaderOverheadBytesPerFile)

sqlDB.Exec(t, "CREATE DATABASE data2")
sqlDB.Exec(t, "RESTORE data.bank FROM latest IN 'userfile:///backup' WITH OPTIONS (into_db='data2')")
files := sqlDB.QueryStr(t, "SHOW BACKUP FILES FROM latest IN 'userfile:///backup'")
require.Equal(t, 11, len(files)) // 1 file for full + 10 for 10 incrementals

// Assert that the restore processor is processing the same span multiple
// times, and the count is based on what's expected from the memory budget.
require.Equal(t, 3, int(restoreProcessorKnobCount.Load())) // Ceiling(11/5)

// Verify data in the restored table.
expectedFingerprints := sqlDB.QueryStr(t, "SHOW EXPERIMENTAL_FINGERPRINTS FROM TABLE data.bank")
actualFingerprints := sqlDB.QueryStr(t, "SHOW EXPERIMENTAL_FINGERPRINTS FROM TABLE data2.bank")
require.Equal(t, expectedFingerprints, actualFingerprints)
}

// TestRestoreMemoryMonitoringMinWorkerMemory tests that restore properly fails
// fast if there's not enough memory to reserve for the minimum number of
// workers.
func TestRestoreMemoryMonitoringMinWorkerMemory(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
const numAccounts = 100

args := base.TestServerArgs{
DefaultTestTenant: base.TestTenantDisabled,
}
params := base.TestClusterArgs{ServerArgs: args}
_, sqlDB, _, cleanupFn := backupRestoreTestSetupWithParams(t, singleNode, numAccounts, InitManualReplication, params)
defer cleanupFn()

// 4 restore workers means we need minimum 2 workers to start restore.
sqlDB.Exec(t, "SET CLUSTER SETTING kv.bulk_io_write.restore_node_concurrency=4")
sqlDB.Exec(t, "SET CLUSTER SETTING bulkio.restore.memory_monitor_ssts=true")

sqlDB.Exec(t, "BACKUP data.bank INTO 'userfile:///backup'")

// Set the budget to be 1 byte lower than minimum mem for 2 workers. This
// restore should fail.
sqlDB.Exec(t, "SET CLUSTER SETTING bulkio.restore.per_processor_memory_limit = $1", 2*minWorkerMemReservation-1)
sqlDB.Exec(t, "CREATE DATABASE restore_fail")
sqlDB.ExpectErr(t, "insufficient memory", "RESTORE data.bank FROM latest IN 'userfile:///backup' WITH into_db='restore_fail'")

// Set the budget to be equal to the minimum mem for 2 workers. The restore
// should succeed.
sqlDB.Exec(t, "SET CLUSTER SETTING bulkio.restore.per_processor_memory_limit = $1", 2*minWorkerMemReservation)
sqlDB.Exec(t, "CREATE DATABASE restore")
sqlDB.Exec(t, "RESTORE data.bank FROM latest IN 'userfile:///backup' WITH into_db='restore'")

// Verify data in the restored table.
expectedFingerprints := sqlDB.QueryStr(t, "SHOW EXPERIMENTAL_FINGERPRINTS FROM TABLE data.bank")
actualFingerprints := sqlDB.QueryStr(t, "SHOW EXPERIMENTAL_FINGERPRINTS FROM TABLE restore.bank")
require.Equal(t, expectedFingerprints, actualFingerprints)
}
4 changes: 4 additions & 0 deletions pkg/ccl/backupccl/backuppb/backup.proto
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,10 @@ message RestoreProgress {
roachpb.RowCount summary = 1 [(gogoproto.nullable) = false];
int64 progressIdx = 2;
roachpb.Span dataSpan = 3 [(gogoproto.nullable) = false];
// CompleteUpTo is the timestamp that the data in DataSpan has been processed
// up to so far. The entire span has been processed if this timestamp is equal
// to restore end time.
util.hlc.Timestamp complete_up_to = 4 [(gogoproto.nullable) = false];
}

message BackupProcessorPlanningTraceEvent {
Expand Down
33 changes: 30 additions & 3 deletions pkg/ccl/backupccl/backuputils/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -1,12 +1,39 @@
load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data")
load("@io_bazel_rules_go//go:def.bzl", "go_library")
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "backuputils",
srcs = ["utils.go"],
srcs = [
"memory_backed_quota_pool.go",
"utils.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backuputils",
visibility = ["//visibility:public"],
deps = ["//pkg/cloud"],
deps = [
"//pkg/cloud",
"//pkg/util/mon",
"//pkg/util/quotapool",
"//pkg/util/syncutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_redact//:redact",
],
)

go_test(
name = "backuputils_test",
srcs = ["memory_backed_quota_pool_test.go"],
args = ["-test.timeout=295s"],
embed = [":backuputils"],
tags = ["ccl_test"],
deps = [
"//pkg/settings/cluster",
"//pkg/util/leaktest",
"//pkg/util/mon",
"//pkg/util/quotapool",
"//pkg/util/syncutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_stretchr_testify//require",
],
)

get_x_data(name = "get_x_data")
Loading

0 comments on commit f04439c

Please sign in to comment.