diff --git a/br/pkg/lightning/checkpoints/checkpoints.go b/br/pkg/lightning/checkpoints/checkpoints.go index 65ea720a1f455..76d7a9aa84353 100644 --- a/br/pkg/lightning/checkpoints/checkpoints.go +++ b/br/pkg/lightning/checkpoints/checkpoints.go @@ -1650,7 +1650,7 @@ func (cpdb *MySQLCheckpointsDB) DestroyErrorCheckpoint(ctx context.Context, tabl DELETE FROM %[1]s.%[2]s WHERE table_name IN (SELECT table_name FROM %[1]s.%[3]s WHERE status <= ?) `, cpdb.schema, CheckpointTableNameEngine, CheckpointTableNameTable) deleteTableQuery = common.SprintfWithIdentifiers(` - DELETE FROM %s.%s status <= ? + DELETE FROM %s.%s WHERE status <= ? `, cpdb.schema, CheckpointTableNameTable) args = []any{CheckpointStatusMaxInvalid} } else { diff --git a/br/pkg/lightning/config/BUILD.bazel b/br/pkg/lightning/config/BUILD.bazel index 9c54f5609fc93..1ae65a73dfd5e 100644 --- a/br/pkg/lightning/config/BUILD.bazel +++ b/br/pkg/lightning/config/BUILD.bazel @@ -42,7 +42,7 @@ go_test( ], embed = [":config"], flaky = True, - shard_count = 47, + shard_count = 48, deps = [ "//br/pkg/lightning/common", "@com_github_burntsushi_toml//:toml", diff --git a/br/pkg/lightning/config/config.go b/br/pkg/lightning/config/config.go index b3bbd66fa33af..28c15d93d7b6d 100644 --- a/br/pkg/lightning/config/config.go +++ b/br/pkg/lightning/config/config.go @@ -100,6 +100,7 @@ const ( DefaultEngineMemCacheSize = 512 * units.MiB DefaultLocalWriterMemCacheSize = 128 * units.MiB + DefaultBlockSize = 16 * units.KiB defaultCSVDataCharacterSet = "binary" defaultCSVDataInvalidCharReplace = utf8.RuneError @@ -1115,6 +1116,9 @@ func (t *TikvImporter) adjust() error { if t.LocalWriterMemCacheSize == 0 { t.LocalWriterMemCacheSize = DefaultLocalWriterMemCacheSize } + if t.BlockSize == 0 { + t.BlockSize = DefaultBlockSize + } if t.ParallelImport && t.AddIndexBySQL { return common.ErrInvalidConfig. diff --git a/br/pkg/lightning/config/config_test.go b/br/pkg/lightning/config/config_test.go index 1a1386ff75849..e88c069b6c0c8 100644 --- a/br/pkg/lightning/config/config_test.go +++ b/br/pkg/lightning/config/config_test.go @@ -1299,3 +1299,16 @@ func TestAdjustConflict(t *testing.T) { cfg.Conflict.MaxRecordRows = 1 require.ErrorContains(t, cfg.Conflict.adjust(&cfg.TikvImporter, &cfg.App), `cannot record duplication (conflict.max-record-rows > 0) when use tikv-importer.backend = "tidb" and conflict.strategy = "replace"`) } + +func TestAdjustBlockSize(t *testing.T) { + cfg := NewConfig() + cfg.TikvImporter.Backend = BackendLocal + cfg.TikvImporter.SortedKVDir = "." + cfg.TiDB.DistSQLScanConcurrency = 1 + cfg.Mydumper.SourceDir = "." + cfg.TikvImporter.BlockSize = 0 + + err := cfg.Adjust(context.Background()) + require.Error(t, err) + require.Equal(t, ByteSize(16384), cfg.TikvImporter.BlockSize) +} diff --git a/pkg/executor/importer/import.go b/pkg/executor/importer/import.go index 0700d9932531e..b4e04bfe4dd2c 100644 --- a/pkg/executor/importer/import.go +++ b/pkg/executor/importer/import.go @@ -1331,6 +1331,7 @@ func (e *LoadDataController) getLocalBackendCfg(pdAddr, dataDir string) local.Ba KeyspaceName: tidb.GetGlobalKeyspaceName(), PausePDSchedulerScope: config.PausePDSchedulerScopeTable, DisableAutomaticCompactions: true, + BlockSize: config.DefaultBlockSize, } if e.IsRaftKV2 { backendConfig.RaftKV2SwitchModeDuration = config.DefaultSwitchTiKVModeInterval