Skip to content

Commit

Permalink
Merge #76834
Browse files Browse the repository at this point in the history
76834: sql: execute batch statements in an implicit transaction r=otan a=rafiss

fixes #44803
relates to #76490

Release justification: high value bug fix to existing functionality 

### *: prepare tests for batch stmt txn change

This commit will make the next one easier to review.

### sql: execute batch statements in an implicit transaction

Release note (bug fix): Previously statements that arrived in a batch
during the simple query protocol would all execute in their own implicit
transactions. Now, we match the Postgres wire protocol, so all these
statements share the same implicit transaction. If a BEGIN is included
in a statement batch, then the existing implicit transaction is
upgraded to an explicit transaction.

### sql: add session var for old implicit txn behavior

Release note (sql change): The enable_implicit_transaction_for_batch_statements
session variable was added. It defaults to true. When it is true,
multiple statements in a single query (a.k.a. a "batch statement") will
all be run in the same implicit transaction, which matches the Postgres
wire protocol. This setting is provided for users who want to preserve
the behavior of CockroachDB versions v21.2 and earlier.

Co-authored-by: Rafi Shamim <[email protected]>
  • Loading branch information
craig[bot] and rafiss committed Mar 15, 2022
2 parents c954f7a + 4fc43c8 commit cfe38f2
Show file tree
Hide file tree
Showing 147 changed files with 2,605 additions and 679 deletions.
4 changes: 2 additions & 2 deletions pkg/ccl/backupccl/backup_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -653,8 +653,8 @@ func backupPlanHook(
ctx, span := tracing.ChildSpan(ctx, stmt.StatementTag())
defer span.Finish()

if !(p.ExtendedEvalContext().TxnImplicit || backupStmt.Options.Detached) {
return errors.Errorf("BACKUP cannot be used inside a transaction without DETACHED option")
if !(p.IsAutoCommit() || backupStmt.Options.Detached) {
return errors.Errorf("BACKUP cannot be used inside a multi-statement transaction without DETACHED option")
}

subdir, err := subdirFn()
Expand Down
6 changes: 4 additions & 2 deletions pkg/ccl/backupccl/backup_rand_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ database_name = 'rand' AND schema_name = 'public'`)
// and per-table restores) work properly with two kinds of table backups
// (full database backups and per-table backups).
for _, backup := range dbBackups {
sqlDB.Exec(t, "DROP DATABASE IF EXISTS restoredb; CREATE DATABASE restoredb")
sqlDB.Exec(t, "DROP DATABASE IF EXISTS restoredb")
sqlDB.Exec(t, "CREATE DATABASE restoredb")
if err := verifyBackupRestoreStatementResult(
t, sqlDB, "RESTORE rand.* FROM $1 WITH OPTIONS (into_db='restoredb')", backup,
); err != nil {
Expand All @@ -135,7 +136,8 @@ database_name = 'rand' AND schema_name = 'public'`)
tableNameCombos := powerset(tableNames)

for i, combo := range tableNameCombos {
sqlDB.Exec(t, "DROP DATABASE IF EXISTS restoredb; CREATE DATABASE restoredb")
sqlDB.Exec(t, "DROP DATABASE IF EXISTS restoredb")
sqlDB.Exec(t, "CREATE DATABASE restoredb")
backupTarget := fmt.Sprintf("%s%d", localFoo, i)
if len(combo) == 0 {
continue
Expand Down
229 changes: 102 additions & 127 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2413,48 +2413,37 @@ func TestBackupRestoreUserDefinedSchemas(t *testing.T) {
defer cleanupFn()

var ts1, ts2, ts3, ts4, ts5, ts6 string
sqlDB.Exec(t, `
CREATE DATABASE d;
USE d;
CREATE SCHEMA sc;
CREATE SCHEMA sc2;
CREATE TABLE d.sc.t1 (x int);
CREATE TABLE d.sc2.t1 (x bool);
`)
sqlDB.Exec(t, `CREATE DATABASE d;`)
sqlDB.Exec(t, `USE d;`)
sqlDB.Exec(t, `CREATE SCHEMA sc;`)
sqlDB.Exec(t, `CREATE SCHEMA sc2;`)
sqlDB.Exec(t, `CREATE TABLE d.sc.t1 (x int);`)
sqlDB.Exec(t, `CREATE TABLE d.sc2.t1 (x bool);`)
sqlDB.QueryRow(t, `SELECT cluster_logical_timestamp()`).Scan(&ts1)

sqlDB.Exec(t, `
ALTER SCHEMA sc RENAME TO sc3;
ALTER SCHEMA sc2 RENAME TO sc;
`)
sqlDB.Exec(t, `ALTER SCHEMA sc RENAME TO sc3;`)
sqlDB.Exec(t, `ALTER SCHEMA sc2 RENAME TO sc;`)
sqlDB.QueryRow(t, `SELECT cluster_logical_timestamp()`).Scan(&ts2)

sqlDB.Exec(t, `
DROP TABLE sc.t1;
DROP TABLE sc3.t1;
DROP SCHEMA sc;
DROP SCHEMA sc3;
`)
sqlDB.Exec(t, `DROP TABLE sc.t1;`)
sqlDB.Exec(t, `DROP TABLE sc3.t1;`)
sqlDB.Exec(t, `DROP SCHEMA sc;`)
sqlDB.Exec(t, `DROP SCHEMA sc3;`)
sqlDB.QueryRow(t, `SELECT cluster_logical_timestamp()`).Scan(&ts3)

sqlDB.Exec(t, `
CREATE SCHEMA sc;
CREATE TABLE sc.t1 (a STRING);
sqlDB.Exec(t, `CREATE SCHEMA sc;`)
sqlDB.Exec(t, `CREATE TABLE sc.t1 (a STRING);
`)
sqlDB.QueryRow(t, `SELECT cluster_logical_timestamp()`).Scan(&ts4)
sqlDB.Exec(t, `BACKUP DATABASE d TO 'nodelocal://0/rev-history-backup' WITH revision_history`)

sqlDB.Exec(t, `
DROP TABLE sc.t1;
DROP SCHEMA sc;
sqlDB.Exec(t, `DROP TABLE sc.t1;`)
sqlDB.Exec(t, `DROP SCHEMA sc;
`)
sqlDB.QueryRow(t, `SELECT cluster_logical_timestamp()`).Scan(&ts5)

sqlDB.Exec(t, `
CREATE SCHEMA sc;
CREATE TABLE sc.t1 (a FLOAT);
`)
sqlDB.Exec(t, `CREATE SCHEMA sc;`)
sqlDB.Exec(t, `CREATE TABLE sc.t1 (a FLOAT);`)
sqlDB.QueryRow(t, `SELECT cluster_logical_timestamp()`).Scan(&ts6)
sqlDB.Exec(t, `BACKUP DATABASE d TO 'nodelocal://0/rev-history-backup' WITH revision_history`)

Expand Down Expand Up @@ -2500,17 +2489,15 @@ CREATE TABLE sc.t1 (a FLOAT);
t.Run("full-cluster", func(t *testing.T) {
_, sqlDB, dataDir, cleanupFn := backupRestoreTestSetup(t, singleNode, 0, InitManualReplication)
defer cleanupFn()
sqlDB.Exec(t, `
CREATE DATABASE d;
USE d;
CREATE SCHEMA unused;
CREATE SCHEMA sc;
CREATE TABLE sc.tb1 (x INT);
INSERT INTO sc.tb1 VALUES (1);
CREATE TYPE sc.typ1 AS ENUM ('hello');
CREATE TABLE sc.tb2 (x sc.typ1);
INSERT INTO sc.tb2 VALUES ('hello');
`)
sqlDB.Exec(t, `CREATE DATABASE d;`)
sqlDB.Exec(t, `USE d;`)
sqlDB.Exec(t, `CREATE SCHEMA unused;`)
sqlDB.Exec(t, `CREATE SCHEMA sc;`)
sqlDB.Exec(t, `CREATE TABLE sc.tb1 (x INT);`)
sqlDB.Exec(t, `INSERT INTO sc.tb1 VALUES (1);`)
sqlDB.Exec(t, `CREATE TYPE sc.typ1 AS ENUM ('hello');`)
sqlDB.Exec(t, `CREATE TABLE sc.tb2 (x sc.typ1);`)
sqlDB.Exec(t, `INSERT INTO sc.tb2 VALUES ('hello');`)
// Now backup the full cluster.
sqlDB.Exec(t, `BACKUP TO 'nodelocal://0/test/'`)
// Start a new server that shares the data directory.
Expand All @@ -2535,17 +2522,15 @@ INSERT INTO sc.tb2 VALUES ('hello');
_, sqlDB, _, cleanupFn := backupRestoreTestSetup(t, singleNode, 0, InitManualReplication)
defer cleanupFn()

sqlDB.Exec(t, `
CREATE DATABASE d;
USE d;
CREATE SCHEMA sc;
CREATE SCHEMA unused;
CREATE TABLE sc.tb1 (x INT);
INSERT INTO sc.tb1 VALUES (1);
CREATE TYPE sc.typ1 AS ENUM ('hello');
CREATE TABLE sc.tb2 (x sc.typ1);
INSERT INTO sc.tb2 VALUES ('hello');
`)
sqlDB.Exec(t, `CREATE DATABASE d;`)
sqlDB.Exec(t, `USE d;`)
sqlDB.Exec(t, `CREATE SCHEMA sc;`)
sqlDB.Exec(t, `CREATE SCHEMA unused;`)
sqlDB.Exec(t, `CREATE TABLE sc.tb1 (x INT);`)
sqlDB.Exec(t, `INSERT INTO sc.tb1 VALUES (1);`)
sqlDB.Exec(t, `CREATE TYPE sc.typ1 AS ENUM ('hello');`)
sqlDB.Exec(t, `CREATE TABLE sc.tb2 (x sc.typ1);`)
sqlDB.Exec(t, `INSERT INTO sc.tb2 VALUES ('hello');`)
// Backup the database.
sqlDB.Exec(t, `BACKUP DATABASE d TO 'nodelocal://0/test/'`)

Expand All @@ -2569,25 +2554,23 @@ INSERT INTO sc.tb2 VALUES ('hello');
_, sqlDB, _, cleanupFn := backupRestoreTestSetup(t, singleNode, 0, InitManualReplication)
defer cleanupFn()

sqlDB.Exec(t, `
CREATE TABLE table_in_data (x INT);
sqlDB.Exec(t, `CREATE TABLE table_in_data (x INT);`)

CREATE SCHEMA data;
CREATE TABLE data.tb1 (x INT);
sqlDB.Exec(t, `CREATE SCHEMA data;`)
sqlDB.Exec(t, `CREATE TABLE data.tb1 (x INT);`)

CREATE DATABASE foo;
USE foo;
CREATE SCHEMA schema_in_foo;
CREATE TABLE schema_in_foo.tb1 (x INT);
sqlDB.Exec(t, `CREATE DATABASE foo;`)
sqlDB.Exec(t, `USE foo;`)
sqlDB.Exec(t, `CREATE SCHEMA schema_in_foo;`)
sqlDB.Exec(t, `CREATE TABLE schema_in_foo.tb1 (x INT);`)

CREATE SCHEMA schema_in_foo2;
CREATE TABLE schema_in_foo2.tb1 (x INT);
sqlDB.Exec(t, `CREATE SCHEMA schema_in_foo2;`)
sqlDB.Exec(t, `CREATE TABLE schema_in_foo2.tb1 (x INT);`)

CREATE SCHEMA foo;
CREATE TABLE foo.tb1 (x INT);
sqlDB.Exec(t, `CREATE SCHEMA foo;`)
sqlDB.Exec(t, `CREATE TABLE foo.tb1 (x INT);`)

CREATE TABLE tb2 (y INT);
`)
sqlDB.Exec(t, `CREATE TABLE tb2 (y INT);`)

for _, tc := range []struct {
name string
Expand Down Expand Up @@ -2635,16 +2618,14 @@ table_name from [SHOW TABLES FROM restore] ORDER BY schema_name, table_name`, tc
_, sqlDB, _, cleanupFn := backupRestoreTestSetup(t, singleNode, 0, InitManualReplication)
defer cleanupFn()

sqlDB.Exec(t, `
CREATE DATABASE d;
USE d;
CREATE SCHEMA sc;
CREATE TYPE sc.typ1 AS ENUM ('hello');
CREATE TABLE sc.tb1 (x sc.typ1);
INSERT INTO sc.tb1 VALUES ('hello');
CREATE TABLE sc.tb2 (x INT);
INSERT INTO sc.tb2 VALUES (1);
`)
sqlDB.Exec(t, `CREATE DATABASE d;`)
sqlDB.Exec(t, `USE d;`)
sqlDB.Exec(t, `CREATE SCHEMA sc;`)
sqlDB.Exec(t, `CREATE TYPE sc.typ1 AS ENUM ('hello');`)
sqlDB.Exec(t, `CREATE TABLE sc.tb1 (x sc.typ1);`)
sqlDB.Exec(t, `INSERT INTO sc.tb1 VALUES ('hello');`)
sqlDB.Exec(t, `CREATE TABLE sc.tb2 (x INT);`)
sqlDB.Exec(t, `INSERT INTO sc.tb2 VALUES (1);`)
{
// We have to qualify the table correctly to back it up. d.tb1 resolves
// to d.public.tb1.
Expand Down Expand Up @@ -2693,25 +2674,23 @@ INSERT INTO sc.tb2 VALUES (1);
defer cleanupFn()
kvDB := tc.Server(0).DB()

sqlDB.Exec(t, `
CREATE DATABASE d1;
USE d1;
CREATE SCHEMA sc1;
CREATE TABLE sc1.tb (x INT);
INSERT INTO sc1.tb VALUES (1);
CREATE SCHEMA sc2;
CREATE TABLE sc2.tb (x INT);
INSERT INTO sc2.tb VALUES (2);
CREATE DATABASE d2;
USE d2;
CREATE SCHEMA sc3;
CREATE TABLE sc3.tb (x INT);
INSERT INTO sc3.tb VALUES (3);
CREATE SCHEMA sc4;
CREATE TABLE sc4.tb (x INT);
INSERT INTO sc4.tb VALUES (4);
`)
sqlDB.Exec(t, `CREATE DATABASE d1;`)
sqlDB.Exec(t, `USE d1;`)
sqlDB.Exec(t, `CREATE SCHEMA sc1;`)
sqlDB.Exec(t, `CREATE TABLE sc1.tb (x INT);`)
sqlDB.Exec(t, `INSERT INTO sc1.tb VALUES (1);`)
sqlDB.Exec(t, `CREATE SCHEMA sc2;`)
sqlDB.Exec(t, `CREATE TABLE sc2.tb (x INT);`)
sqlDB.Exec(t, `INSERT INTO sc2.tb VALUES (2);`)

sqlDB.Exec(t, `CREATE DATABASE d2;`)
sqlDB.Exec(t, `USE d2;`)
sqlDB.Exec(t, `CREATE SCHEMA sc3;`)
sqlDB.Exec(t, `CREATE TABLE sc3.tb (x INT);`)
sqlDB.Exec(t, `INSERT INTO sc3.tb VALUES (3);`)
sqlDB.Exec(t, `CREATE SCHEMA sc4;`)
sqlDB.Exec(t, `CREATE TABLE sc4.tb (x INT);`)
sqlDB.Exec(t, `INSERT INTO sc4.tb VALUES (4);`)
{
// Backup all databases.
sqlDB.Exec(t, `BACKUP DATABASE d1, d2 TO 'nodelocal://0/test/'`)
Expand Down Expand Up @@ -2751,14 +2730,12 @@ INSERT INTO sc4.tb VALUES (4);
_, sqlDB, _, cleanupFn := backupRestoreTestSetup(t, singleNode, 0, InitManualReplication)
defer cleanupFn()

sqlDB.Exec(t, `
CREATE DATABASE d;
USE d;
CREATE SCHEMA sc;
CREATE TYPE sc.typ1 AS ENUM ('hello');
CREATE TABLE sc.tb1 (x sc.typ1);
INSERT INTO sc.tb1 VALUES ('hello');
`)
sqlDB.Exec(t, `CREATE DATABASE d;`)
sqlDB.Exec(t, `USE d;`)
sqlDB.Exec(t, `CREATE SCHEMA sc;`)
sqlDB.Exec(t, `CREATE TYPE sc.typ1 AS ENUM ('hello');`)
sqlDB.Exec(t, `CREATE TABLE sc.tb1 (x sc.typ1);`)
sqlDB.Exec(t, `INSERT INTO sc.tb1 VALUES ('hello');`)
// Take a backup.
sqlDB.Exec(t, `BACKUP TABLE d.sc.tb1 TO 'nodelocal://0/test/'`)
// Now drop the table.
Expand Down Expand Up @@ -4281,7 +4258,9 @@ func TestRestoreAsOfSystemTime(t *testing.T) {
t, fmt.Sprintf(`RESTORE data.* FROM $1 AS OF SYSTEM TIME %s WITH into_db='err'`, i),
latestBackup,
)
sqlDB.Exec(t, `DROP DATABASE err; CREATE DATABASE err`)
sqlDB.Exec(t, `DROP DATABASE err`)
sqlDB.Exec(t, `CREATE DATABASE err`)

} else {
sqlDB.ExpectErr(
t, "invalid RESTORE timestamp",
Expand All @@ -4299,7 +4278,8 @@ func TestRestoreAsOfSystemTime(t *testing.T) {
t, fmt.Sprintf(`RESTORE data.* FROM $1, $2, $3 AS OF SYSTEM TIME %s WITH into_db='err'`, i),
latestBackup, incLatestBackup, inc2LatestBackup,
)
sqlDB.Exec(t, `DROP DATABASE err; CREATE DATABASE err`)
sqlDB.Exec(t, `DROP DATABASE err`)
sqlDB.Exec(t, `CREATE DATABASE err`)
} else {
sqlDB.ExpectErr(
t, "invalid RESTORE timestamp",
Expand Down Expand Up @@ -5548,7 +5528,7 @@ func TestDetachedBackup(t *testing.T) {
return tx.QueryRow(`BACKUP DATABASE data TO $1`, localFoo).Scan(&jobID)
})
require.True(t, testutils.IsError(err,
"BACKUP cannot be used inside a transaction without DETACHED option"))
"BACKUP cannot be used inside a multi-statement transaction without DETACHED option"))

// Okay to run DETACHED backup, even w/out explicit transaction.
sqlDB.QueryRow(t, `BACKUP DATABASE data TO $1 WITH DETACHED`, localFoo).Scan(&jobID)
Expand Down Expand Up @@ -5602,7 +5582,7 @@ func TestDetachedRestore(t *testing.T) {
return tx.QueryRow(`RESTORE TABLE t FROM $1 WITH INTO_DB=test`, localFoo).Scan(&jobID)
})
require.True(t, testutils.IsError(err,
"RESTORE cannot be used inside a transaction without DETACHED option"))
"RESTORE cannot be used inside a multi-statement transaction without DETACHED option"))

// Okay to run DETACHED RESTORE, even w/out explicit transaction.
sqlDB.QueryRow(t, `RESTORE TABLE t FROM $1 WITH DETACHED, INTO_DB=test`,
Expand Down Expand Up @@ -9277,11 +9257,9 @@ func TestDroppedDescriptorRevisionAndSystemDBIDClash(t *testing.T) {
_, sqlDB, tempDir, cleanupFn := backupRestoreTestSetup(t, singleNode, 1, InitManualReplication)
defer cleanupFn()

sqlDB.Exec(t, `
CREATE TABLE foo (id INT);
BACKUP TO 'nodelocal://0/foo' WITH revision_history;
DROP TABLE foo;
`)
sqlDB.Exec(t, `CREATE TABLE foo (id INT);`)
sqlDB.Exec(t, `BACKUP TO 'nodelocal://0/foo' WITH revision_history;`)
sqlDB.Exec(t, `DROP TABLE foo;`)

var aost string
sqlDB.QueryRow(t, "SELECT cluster_logical_timestamp()").Scan(&aost)
Expand Down Expand Up @@ -9357,15 +9335,13 @@ func TestRestoreRemappingOfExistingUDTInColExpr(t *testing.T) {
_, sqlDB, _, cleanupFn := backupRestoreTestSetup(t, singleNode, numAccounts, InitManualReplication)
defer cleanupFn()

sqlDB.Exec(t, `
CREATE TYPE status AS ENUM ('open', 'closed', 'inactive');
CREATE TABLE foo (id INT PRIMARY KEY, what status default 'open');
BACKUP DATABASE data to 'nodelocal://0/foo';
DROP TABLE foo CASCADE;
DROP TYPE status;
CREATE TYPE status AS ENUM ('open', 'closed', 'inactive');
RESTORE TABLE foo FROM 'nodelocal://0/foo';
`)
sqlDB.Exec(t, `CREATE TYPE status AS ENUM ('open', 'closed', 'inactive');`)
sqlDB.Exec(t, `CREATE TABLE foo (id INT PRIMARY KEY, what status default 'open');`)
sqlDB.Exec(t, `BACKUP DATABASE data to 'nodelocal://0/foo';`)
sqlDB.Exec(t, `DROP TABLE foo CASCADE;`)
sqlDB.Exec(t, `DROP TYPE status;`)
sqlDB.Exec(t, `CREATE TYPE status AS ENUM ('open', 'closed', 'inactive');`)
sqlDB.Exec(t, `RESTORE TABLE foo FROM 'nodelocal://0/foo';`)
}

// TestGCDropIndexSpanExpansion is a regression test for
Expand Down Expand Up @@ -9398,13 +9374,12 @@ func TestGCDropIndexSpanExpansion(t *testing.T) {
sqlRunner := sqlutils.MakeSQLRunner(conn)
sqlRunner.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '100ms'`) // speeds up the test

sqlRunner.Exec(t, `
CREATE DATABASE test; USE test;
CREATE TABLE foo (id INT PRIMARY KEY, id2 INT, id3 INT, INDEX bar (id2), INDEX baz(id3));
ALTER INDEX foo@bar CONFIGURE ZONE USING gc.ttlseconds = '1';
INSERT INTO foo VALUES (1, 2, 3);
DROP INDEX foo@bar;
`)
sqlRunner.Exec(t, `CREATE DATABASE test;`)
sqlRunner.Exec(t, ` USE test;`)
sqlRunner.Exec(t, `CREATE TABLE foo (id INT PRIMARY KEY, id2 INT, id3 INT, INDEX bar (id2), INDEX baz(id3));`)
sqlRunner.Exec(t, `ALTER INDEX foo@bar CONFIGURE ZONE USING gc.ttlseconds = '1';`)
sqlRunner.Exec(t, `INSERT INTO foo VALUES (1, 2, 3);`)
sqlRunner.Exec(t, `DROP INDEX foo@bar;`)

// Wait until the index is about to get gc'ed.
<-aboutToGC
Expand Down
Loading

0 comments on commit cfe38f2

Please sign in to comment.