Skip to content

Commit

Permalink
Merge #78512
Browse files Browse the repository at this point in the history
78512: sql: block operations properly on implicit transactions r=fqazi a=fqazi

Previously, an enhancement was made to allow implicit
transactions to execute multiple statements, which caused
code to block operations in implicit transactions to break.
This was problematic because these scenarios cared that
only a single statement is executed, and not if it was
only an implicit transition. For example, the declarative
schema changer and legacy schema changer could be mixed
in implicit transactions after this change. To address this,
this patch modifies places that we're checking for implicit
transactions to instead check for single statement transactions.

Release note: None


Co-authored-by: Faizan Qazi <[email protected]>
  • Loading branch information
craig[bot] and fqazi committed Apr 2, 2022
2 parents 2ecd90c + 6ba4c1e commit 71e32a6
Show file tree
Hide file tree
Showing 76 changed files with 366 additions and 116 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/backup_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -668,7 +668,7 @@ func backupPlanHook(
ctx, span := tracing.ChildSpan(ctx, stmt.StatementTag())
defer span.Finish()

if !(p.IsAutoCommit() || backupStmt.Options.Detached) {
if !(p.ExtendedEvalContext().TxnIsSingleStmt || backupStmt.Options.Detached) {
return errors.Errorf("BACKUP cannot be used inside a multi-statement transaction without DETACHED option")
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/restore_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -1149,7 +1149,7 @@ func restorePlanHook(
ctx, span := tracing.ChildSpan(ctx, stmt.StatementTag())
defer span.Finish()

if !(p.IsAutoCommit() || restoreStmt.Options.Detached) {
if !(p.ExtendedEvalContext().TxnIsSingleStmt || restoreStmt.Options.Detached) {
return errors.Errorf("RESTORE cannot be used inside a multi-statement transaction without DETACHED option")
}

Expand Down
11 changes: 6 additions & 5 deletions pkg/ccl/backupccl/show_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,14 @@ func TestShowBackup(t *testing.T) {
_, sqlDBRestore, cleanupEmptyCluster := backupRestoreTestSetupEmpty(t, singleNode, tempDir, InitManualReplication, base.TestClusterArgs{})
defer cleanupFn()
defer cleanupEmptyCluster()
sqlDB.Exec(t, `
sqlDB.ExecMultiple(t, strings.Split(`
SET CLUSTER SETTING sql.cross_db_fks.enabled = TRUE;
CREATE TYPE data.welcome AS ENUM ('hello', 'hi');
USE data; CREATE SCHEMA sc;
CREATE TABLE data.sc.t1 (a INT);
CREATE TABLE data.sc.t2 (a data.welcome);
`)
`,
`;`)...)

const full, inc, inc2 = localFoo + "/full", localFoo + "/inc", localFoo + "/inc2"

Expand Down Expand Up @@ -768,13 +769,13 @@ func TestShowBackupWithDebugIDs(t *testing.T) {
defer cleanupFn()

// add 1 type, 1 schema, and 2 tables to the database
sqlDB.Exec(t, `
sqlDB.ExecMultiple(t, strings.Split(`
SET CLUSTER SETTING sql.cross_db_fks.enabled = TRUE;
CREATE TYPE data.welcome AS ENUM ('hello', 'hi');
USE data; CREATE SCHEMA sc;
CREATE TABLE data.sc.t1 (a INT);
CREATE TABLE data.sc.t2 (a data.welcome);
`)
CREATE TABLE data.sc.t2 (a data.welcome);`,
`;`)...)

const full = localFoo + "/full"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,13 @@ new-server name=s1

exec-sql
SET use_declarative_schema_changer = 'off';
----

exec-sql
SET CLUSTER SETTING jobs.debug.pausepoints = 'schemachanger.before.exec';
----

exec-sql
CREATE DATABASE d;
CREATE TABLE d.foo (id INT);
----
Expand Down Expand Up @@ -118,6 +124,9 @@ CREATE TABLE d2.s.t (id INT);

exec-sql
SET use_declarative_schema_changer = 'off';
----

exec-sql
SET CLUSTER SETTING jobs.debug.pausepoints = 'schemachanger.before.exec';
----

Expand Down
6 changes: 6 additions & 0 deletions pkg/ccl/backupccl/testdata/backup-restore/column-families
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,13 @@ ALTER TABLE cfs SPLIT AT SELECT a FROM cfs;
exec-sql
-- Split the output files very small to catch output SSTs mid-row.
SET CLUSTER SETTING bulkio.backup.file_size = '1';
----

exec-sql
SET CLUSTER SETTING kv.bulk_sst.target_size = '1';
----

exec-sql
SET CLUSTER SETTING bulkio.backup.merge_file_buffer_size = '1MiB';
----

Expand Down
3 changes: 3 additions & 0 deletions pkg/ccl/backupccl/testdata/backup-restore/max-row-size
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ pq: row larger than max row size: table 112 family 0 primary key /Table/112/1/2/

exec-sql
SET CLUSTER SETTING sql.guardrails.max_row_size_err = DEFAULT;
----

exec-sql
INSERT INTO d2.maxrow VALUES (2, repeat('y', 20000));
----

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ BACKUP DATABASE d INTO 'userfile:///foo'
# Attempt the restore but pause it before publishing descriptors.
exec-sql
SET CLUSTER SETTING jobs.debug.pausepoints = 'restore.before_publishing_descriptors';
----

exec-sql
DROP DATABASE d;
----

Expand Down Expand Up @@ -52,7 +55,13 @@ BACKUP DATABASE d INTO 'userfile:///foo'
# Attempt the restore but pause it after publishing descriptors.
exec-sql
SET CLUSTER SETTING jobs.debug.pausepoints = '';
----

exec-sql
SET CLUSTER SETTING jobs.debug.pausepoints = 'restore.after_publishing_descriptors';
----

exec-sql
DROP DATABASE d;
----

Expand Down
7 changes: 3 additions & 4 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,8 +374,7 @@ func TestChangefeedTenants(t *testing.T) {

tenantServer, tenantDB := serverutils.StartTenant(t, kvServer, tenantArgs)
tenantSQL := sqlutils.MakeSQLRunner(tenantDB)
tenantSQL.Exec(t, serverSetupStatements)

tenantSQL.ExecMultiple(t, strings.Split(serverSetupStatements, ";")...)
tenantSQL.Exec(t, `CREATE TABLE foo_in_tenant (pk INT PRIMARY KEY)`)
t.Run("changefeed on non-tenant table fails", func(t *testing.T) {
kvSQL := sqlutils.MakeSQLRunner(kvSQLdb)
Expand Down Expand Up @@ -445,7 +444,7 @@ func TestChangefeedTenantsExternalIOEnabled(t *testing.T) {

tenantServer, tenantDB := serverutils.StartTenant(t, kvServer, tenantArgs)
tenantSQL := sqlutils.MakeSQLRunner(tenantDB)
tenantSQL.Exec(t, serverSetupStatements)
tenantSQL.ExecMultiple(t, strings.Split(serverSetupStatements, ";")...)
tenantSQL.Exec(t, `CREATE TABLE foo_in_tenant (pk INT PRIMARY KEY)`)

t.Run("sinkful changefeed works", func(t *testing.T) {
Expand Down Expand Up @@ -952,7 +951,7 @@ func TestChangefeedExternalIODisabled(t *testing.T) {
})
defer s.Stopper().Stop(ctx)
sqlDB := sqlutils.MakeSQLRunner(db)
sqlDB.Exec(t, serverSetupStatements)
sqlDB.ExecMultiple(t, strings.Split(serverSetupStatements, ";")...)
sqlDB.Exec(t, "CREATE TABLE target_table (pk INT PRIMARY KEY)")
for _, proto := range disallowedSinkProtos {
sqlDB.ExpectErr(t, "Outbound IO is disabled by configuration, cannot create changefeed",
Expand Down
14 changes: 6 additions & 8 deletions pkg/ccl/changefeedccl/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,8 +359,8 @@ func startTestFullServer(
}
}()

_, err = db.ExecContext(ctx, serverSetupStatements)
require.NoError(t, err)
sqlDB := sqlutils.MakeSQLRunner(db)
sqlDB.ExecMultiple(t, strings.Split(serverSetupStatements, ";")...)

if region := serverArgsRegion(args); region != "" {
_, err = db.ExecContext(ctx, fmt.Sprintf(`ALTER DATABASE d PRIMARY REGION "%s"`, region))
Expand Down Expand Up @@ -399,8 +399,8 @@ func startTestCluster(t testing.TB) (serverutils.TestClusterInterface, *gosql.DB
require.NoError(t, err)
}
}()
_, err = db.ExecContext(ctx, serverSetupStatements)
require.NoError(t, err)
sqlDB := sqlutils.MakeSQLRunner(db)
sqlDB.ExecMultiple(t, strings.Split(serverSetupStatements, ";")...)

_, err = db.ExecContext(ctx, `ALTER DATABASE d PRIMARY REGION "us-east1"`)
return cluster, db, cleanupAndReset
Expand All @@ -409,8 +409,6 @@ func startTestCluster(t testing.TB) (serverutils.TestClusterInterface, *gosql.DB
func startTestTenant(
t testing.TB, options feedTestOptions,
) (serverutils.TestServerInterface, *gosql.DB, func()) {
ctx := context.Background()

kvServer, _, cleanupCluster := startTestFullServer(t, options)
knobs := base.TestingKnobs{
DistSQL: &execinfra.TestingKnobs{Changefeed: &TestingKnobs{}},
Expand All @@ -432,8 +430,8 @@ func startTestTenant(

tenantServer, tenantDB := serverutils.StartTenant(t, kvServer, tenantArgs)
// Re-run setup on the tenant as well
_, err := tenantDB.ExecContext(ctx, serverSetupStatements)
require.NoError(t, err)
tenantRunner := sqlutils.MakeSQLRunner(tenantDB)
tenantRunner.ExecMultiple(t, strings.Split(serverSetupStatements, ";")...)

server := &testServerShim{tenantServer, kvServer}
// Log so that it is clear if a failed test happened
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,9 @@ reset-matching-stmt-for-tracing
# Set a super high closed bounded staleness target and execute a schema change.
exec
SET CLUSTER SETTING kv.closed_timestamp.target_duration = '1hr';
----

exec
ALTER TABLE t ADD COLUMN new_col INT NOT NULL DEFAULT 2
----

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

statement ok
SET CLUSTER SETTING sql.defaults.primary_region = "invalid-region-name";

statement ok
SET CLUSTER SETTING sql.multiregion.drop_primary_region.enabled = 'off'

# Test invalid default primary region name.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ CREATE TABLE team (
dislikes string[],
FAMILY "primary" (id, name, likes, dislikes)
);

statement ok
IMPORT INTO team CSV DATA ('nodelocal://1/team_export/export*.csv') WITH DELIMITER = '|'

query ITTT colnames
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
# we need to wait for the system config to propagate.
statement ok
SET CLUSTER SETTING kv.closed_timestamp.side_transport_interval = '10ms';

statement ok
SET CLUSTER SETTING kv.closed_timestamp.target_duration = '10ms';

statement ok
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

statement ok
SET CLUSTER SETTING kv.closed_timestamp.side_transport_interval = '10ms';

statement ok
SET CLUSTER SETTING kv.closed_timestamp.target_duration = '10ms';

statement ok
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
# we need to wait for the system config to propagate.
statement ok
SET CLUSTER SETTING kv.closed_timestamp.side_transport_interval = '10ms';

statement ok
SET CLUSTER SETTING kv.closed_timestamp.target_duration = '10ms';

statement ok
Expand Down Expand Up @@ -1280,6 +1282,8 @@ CREATE INDEX new_idx ON regional_by_row_table(a, b)
# #56201).
statement ok
CREATE TABLE t56201 (a INT, b STRING, c STRING NOT NULL) LOCALITY REGIONAL BY ROW;

statement ok
ALTER TABLE t56201 INJECT STATISTICS '[
{
"columns": ["a"],
Expand All @@ -1301,6 +1305,8 @@ ALTER TABLE t56201 INJECT STATISTICS '[
"created_at": "2018-01-01 1:00:00.00000+00:00"
}
]';

statement ok
ALTER TABLE t56201 ADD CONSTRAINT key_a_b UNIQUE (a, b);

query T
Expand Down
16 changes: 10 additions & 6 deletions pkg/ccl/multiregionccl/datadriven_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,12 +171,16 @@ func TestMultiRegionDataDriven(t *testing.T) {
// Speed up closing of timestamps, in order to sleep less below before
// we can use follower_read_timestamp(). follower_read_timestamp() uses
// sum of the following settings.
_, err = sqlConn.Exec(
"SET CLUSTER SETTING kv.closed_timestamp.target_duration = '0.4s';" +
"SET CLUSTER SETTING kv.closed_timestamp.side_transport_interval = '0.1s';" +
"SET CLUSTER SETTING kv.closed_timestamp.propagation_slack = '0.5s'")
if err != nil {
return err.Error()
for _, stmt := range strings.Split(`
SET CLUSTER SETTING kv.closed_timestamp.target_duration = '0.4s';
SET CLUSTER SETTING kv.closed_timestamp.side_transport_interval = '0.1s';
SET CLUSTER SETTING kv.closed_timestamp.propagation_slack = '0.5s'
`,
";") {
_, err = sqlConn.Exec(stmt)
if err != nil {
return err.Error()
}
}

case "cleanup-cluster":
Expand Down
6 changes: 3 additions & 3 deletions pkg/ccl/multiregionccl/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,10 +401,10 @@ ALTER TABLE db.public.global CONFIGURE ZONE USING

_, err := sqlDB.Exec(`
CREATE DATABASE db PRIMARY REGION "us-east1" REGIONS "us-east2";
CREATE TABLE db.global () LOCALITY GLOBAL;
SET CLUSTER SETTING sql.defaults.multiregion_placement_policy.enabled = true;`)
CREATE TABLE db.global () LOCALITY GLOBAL;`)
require.NoError(t, err)
_, err = sqlDB.Exec(`SET CLUSTER SETTING sql.defaults.multiregion_placement_policy.enabled = true;`)
require.NoError(t, err)

go func() {
defer func() {
close(regionOpFinished)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ block-gc-jobs
# with the mvcc-index-backfiller.
exec-sql
SET CLUSTER SETTING sql.mvcc_compliant_index_creation.enabled = true;
----

exec-sql
CREATE INDEX idx2 ON db.t (j);
ALTER INDEX db.t@idx2 CONFIGURE ZONE USING gc.ttlseconds = 1;
----
Expand Down Expand Up @@ -134,6 +137,9 @@ translate database=db table=t
# with the non-mvcc-index-backfiller.
exec-sql
SET CLUSTER SETTING sql.mvcc_compliant_index_creation.enabled = false;
----

exec-sql
CREATE INDEX idx3 ON db.t (j);
ALTER INDEX db.t@idx3 CONFIGURE ZONE USING gc.ttlseconds = 1;
----
Expand All @@ -153,6 +159,9 @@ translate database=db table=t
# zone configuration should also be cleaned up.
exec-sql
SET CLUSTER SETTING sql.mvcc_compliant_index_creation.enabled = true;
----

exec-sql
CREATE TABLE db.t2(i INT PRIMARY KEY, j INT);
CREATE INDEX idx ON db.t2 (j);
ALTER INDEX db.t2@idx CONFIGURE ZONE USING gc.ttlseconds = 1;
Expand All @@ -165,6 +174,9 @@ translate database=db table=t2

exec-sql
SET CLUSTER SETTING sql.mvcc_compliant_index_creation.enabled = false;
----

exec-sql
CREATE TABLE db.t3(i INT PRIMARY KEY, j INT);
CREATE INDEX idx ON db.t3 (j);
ALTER INDEX db.t3@idx CONFIGURE ZONE USING gc.ttlseconds = 1;
Expand Down
13 changes: 8 additions & 5 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ package streamingest
import (
"context"
"net/url"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -67,26 +68,28 @@ func TestTenantStreaming(t *testing.T) {
resetFreq := changefeedbase.TestingSetDefaultMinCheckpointFrequency(50 * time.Millisecond)
defer resetFreq()
// Set required cluster settings.
_, err := sourceDB.Exec(`
sourceDBRunner := sqlutils.MakeSQLRunner(sourceDB)
sourceDBRunner.ExecMultiple(t, strings.Split(`
SET CLUSTER SETTING kv.rangefeed.enabled = true;
SET CLUSTER SETTING kv.closed_timestamp.target_duration = '1s';
SET CLUSTER SETTING changefeed.experimental_poll_interval = '10ms';
SET CLUSTER SETTING stream_replication.min_checkpoint_frequency = '1s';
`)
require.NoError(t, err)
`,
";")...)

// Start the destination server.
hDest, cleanupDest := streamingtest.NewReplicationHelper(t, base.TestServerArgs{})
defer cleanupDest()
// destSQL refers to the system tenant as that's the one that's running the
// job.
destSQL := hDest.SysDB
destSQL.Exec(t, `
destSQL.ExecMultiple(t, strings.Split(`
SET CLUSTER SETTING stream_replication.consumer_heartbeat_frequency = '2s';
SET CLUSTER SETTING bulkio.stream_ingestion.minimum_flush_interval = '5us';
SET CLUSTER SETTING bulkio.stream_ingestion.cutover_signal_poll_interval = '100ms';
SET enable_experimental_stream_replication = true;
`)
`,
";")...)

// Sink to read data from.
pgURL, cleanupSink := sqlutils.PGUrl(t, source.ServingSQLAddr(), t.Name(), url.User(security.RootUser))
Expand Down
Loading

0 comments on commit 71e32a6

Please sign in to comment.