Skip to content

Commit

Permalink
Merge #91694 #92320
Browse files Browse the repository at this point in the history
91694: sqlliveness: embed current region into session ID r=JeffSwenson,ajwerner a=jaylim-crl

Follow up on #90408 and #91019.

Previously, the sqlliveness subsystem was using enum.One when constructing the
session ID since it doesn't have the regional data yet. This commit implements
that missing part of it by ensuring that the regional representation is
plumbed all the way to the sqlliveness subsystem whenever the SQL pod is
started, and use it to construct the session ID. This will enable our REGIONAL
BY ROW work to put the data in the right region.

The multi-region enum for the system database will be read during startup for
that to work. Since doing this already requires loading the system DB's
descriptor (which indirectly tests whether the system DB has been bootstrapped)
for the tenant, we can remove the call to checkTenantExists.

Epic: None

Release note: None

92320: sql: remove Txn() from JobExecContext r=adityamaru a=stevendanna

This removes Txn() from JobExecContext. This method would always
return a nil txn, making it a bit error prone to actually use in
jobs.

Epic: None

Release note: None

Co-authored-by: Jay <[email protected]>
Co-authored-by: Steven Danna <[email protected]>
  • Loading branch information
3 people committed Nov 23, 2022
3 parents 0d9669a + 3685865 + d640759 commit 5ebb4ca
Show file tree
Hide file tree
Showing 20 changed files with 430 additions and 123 deletions.
23 changes: 13 additions & 10 deletions pkg/ccl/backupccl/backup_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,16 +473,19 @@ func (b *backupResumer) Resume(ctx context.Context, execCtx interface{}) error {
// TODO(adityamaru: Break this code block into helper methods.
if details.URI == "" {
initialDetails := details
backupDetails, m, err := getBackupDetailAndManifest(
ctx, p.ExecCfg(), p.Txn(), details, p.User(), backupDest,
)
if err != nil {
if err := p.ExecCfg().DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
backupDetails, m, err := getBackupDetailAndManifest(
ctx, p.ExecCfg(), txn, details, p.User(), backupDest,
)
if err != nil {
return err
}
details = backupDetails
backupManifest = &m
return nil
}); err != nil {
return err
}
details = backupDetails
// Reset backupDetails so nobody accidentally uses it.
backupDetails = jobspb.BackupDetails{} //lint:ignore SA4006 intentionally clearing so no one uses this.
backupManifest = &m

// Now that we have resolved the details, and manifest, write a protected
// timestamp record on the backup's target spans/schema object.
Expand All @@ -497,7 +500,7 @@ func (b *backupResumer) Resume(ctx context.Context, execCtx interface{}) error {
if details.ProtectedTimestampRecord != nil {
if err := p.ExecCfg().DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
return protectTimestampForBackup(
ctx, p.ExecCfg(), txn, b.job.ID(), m, details,
ctx, p.ExecCfg(), txn, b.job.ID(), backupManifest, details,
)
}); err != nil {
return err
Expand Down Expand Up @@ -563,7 +566,7 @@ func (b *backupResumer) Resume(ctx context.Context, execCtx interface{}) error {
lic := utilccl.CheckEnterpriseEnabled(
p.ExecCfg().Settings, p.ExecCfg().NodeInfo.LogicalClusterID(), "",
) != nil
collectTelemetry(ctx, m, initialDetails, details, lic, b.job.ID())
collectTelemetry(ctx, backupManifest, initialDetails, details, lic, b.job.ID())
}

// For all backups, partitioned or not, the main BACKUP manifest is stored at
Expand Down
6 changes: 3 additions & 3 deletions pkg/ccl/backupccl/backup_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -861,7 +861,7 @@ func logAndSanitizeBackupDestinations(ctx context.Context, backupDestinations ..

func collectTelemetry(
ctx context.Context,
backupManifest backuppb.BackupManifest,
backupManifest *backuppb.BackupManifest,
initialDetails, backupDetails jobspb.BackupDetails,
licensed bool,
jobID jobspb.JobID,
Expand Down Expand Up @@ -1156,7 +1156,7 @@ func getReintroducedSpans(
return tableSpans, nil
}

func getProtectedTimestampTargetForBackup(backupManifest backuppb.BackupManifest) *ptpb.Target {
func getProtectedTimestampTargetForBackup(backupManifest *backuppb.BackupManifest) *ptpb.Target {
if backupManifest.DescriptorCoverage == tree.AllDescriptors {
return ptpb.MakeClusterTarget()
}
Expand Down Expand Up @@ -1195,7 +1195,7 @@ func protectTimestampForBackup(
execCfg *sql.ExecutorConfig,
txn *kv.Txn,
jobID jobspb.JobID,
backupManifest backuppb.BackupManifest,
backupManifest *backuppb.BackupManifest,
backupDetails jobspb.BackupDetails,
) error {
tsToProtect := backupManifest.EndTime
Expand Down
3 changes: 0 additions & 3 deletions pkg/ccl/changefeedccl/cdceval/constraint.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,5 @@ func constrainSpansBySelectClause(
}

func schemaTS(execCtx sql.JobExecContext) hlc.Timestamp {
if execCtx.Txn() != nil {
return execCtx.Txn().ReadTimestamp()
}
return execCtx.ExecCfg().Clock.Now()
}
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/cdceval/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import (
// buffer using cdceval.AsStringUnredacted.
func NormalizeAndValidateSelectForTarget(
ctx context.Context,
execCtx sql.JobExecContext,
execCtx sql.PlanHookState,
desc catalog.TableDescriptor,
target jobspb.ChangefeedTargetSpecification,
sc *tree.SelectClause,
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/changefeedccl/cdceval/validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func TestNormalizeAndValidate(t *testing.T) {
SearchPath: sessiondata.DefaultSearchPath.GetPathArray(),
})
defer cleanup()
execCtx := p.(sql.JobExecContext)
execCtx := p.(sql.PlanHookState)

for _, tc := range []struct {
name string
Expand Down Expand Up @@ -289,7 +289,7 @@ func TestSelectClauseRequiresPrev(t *testing.T) {
SearchPath: sessiondata.DefaultSearchPath.GetPathArray(),
})
defer cleanup()
execCtx := p.(sql.JobExecContext)
execCtx := p.(sql.PlanHookState)

for _, tc := range []struct {
name string
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -897,7 +897,7 @@ func validateDetailsAndOptions(
// This method modifies passed in select clause to reflect normalization step.
func validateAndNormalizeChangefeedExpression(
ctx context.Context,
execCtx sql.JobExecContext,
execCtx sql.PlanHookState,
sc *tree.SelectClause,
descriptors map[tree.TablePattern]catalog.Descriptor,
targets []jobspb.ChangefeedTargetSpecification,
Expand Down Expand Up @@ -977,7 +977,7 @@ func (b *changefeedResumer) handleChangefeedError(
const errorFmt = "job failed (%v) but is being paused because of %s=%s"
errorMessage := fmt.Sprintf(errorFmt, changefeedErr,
changefeedbase.OptOnError, changefeedbase.OptOnErrorPause)
return b.job.PauseRequested(ctx, jobExec.Txn(), func(ctx context.Context,
return b.job.PauseRequested(ctx, nil /* txn */, func(ctx context.Context,
planHookState interface{}, txn *kv.Txn, progress *jobspb.Progress) error {
err := b.OnPauseRequest(ctx, jobExec, txn, progress)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6861,7 +6861,7 @@ func normalizeCDCExpression(t *testing.T, execCfgI interface{}, exprStr string)
})
defer cleanup()

execCtx := p.(sql.JobExecContext)
execCtx := p.(sql.PlanHookState)
_, _, err = cdceval.NormalizeAndValidateSelectForTarget(
context.Background(), execCtx, desc, target, sc, false, false, false,
)
Expand Down
3 changes: 3 additions & 0 deletions pkg/ccl/multiregionccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ go_test(
"multiregion_system_table_test.go",
"multiregion_test.go",
"region_test.go",
"region_util_test.go",
"regional_by_row_system_database_test.go",
"regional_by_row_test.go",
"roundtrips_test.go",
Expand Down Expand Up @@ -67,11 +68,13 @@ go_test(
"//pkg/sql/catalog/desctestutils",
"//pkg/sql/catalog/tabledesc",
"//pkg/sql/catalog/typedesc",
"//pkg/sql/enum",
"//pkg/sql/execinfra",
"//pkg/sql/parser",
"//pkg/sql/rowenc",
"//pkg/sql/sem/tree",
"//pkg/sql/sqlliveness",
"//pkg/sql/sqlliveness/slstorage",
"//pkg/sql/sqltestutils",
"//pkg/sql/tests",
"//pkg/sql/types",
Expand Down
97 changes: 89 additions & 8 deletions pkg/ccl/multiregionccl/multiregion_system_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/typedesc"
"github.com/cockroachdb/cockroach/pkg/sql/enum"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slstorage"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
Expand Down Expand Up @@ -176,10 +178,10 @@ func TestMrSystemDatabase(t *testing.T) {

t.Run("InUse", func(t *testing.T) {
query := `
SELECT id, addr, session_id, locality, crdb_region
FROM system.sql_instances
WHERE session_id IS NOT NULL
`
SELECT id, addr, session_id, locality, crdb_region
FROM system.sql_instances
WHERE session_id IS NOT NULL
`
rows := tDB.Query(t, query)
require.True(t, rows.Next())
for {
Expand Down Expand Up @@ -207,10 +209,10 @@ func TestMrSystemDatabase(t *testing.T) {

t.Run("Preallocated", func(t *testing.T) {
query := `
SELECT id, addr, session_id, locality, crdb_region
FROM system.sql_instances
WHERE session_id IS NULL
`
SELECT id, addr, session_id, locality, crdb_region
FROM system.sql_instances
WHERE session_id IS NULL
`
rows := tDB.Query(t, query)
require.True(t, rows.Next())
for {
Expand All @@ -234,3 +236,82 @@ func TestMrSystemDatabase(t *testing.T) {
})
})
}

func TestTenantStartupWithMultiRegionEnum(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
defer envutil.TestSetEnv(t, "COCKROACH_MR_SYSTEM_DATABASE", "1")()

// Enable settings required for configuring a tenant's system database as multi-region.
cs := cluster.MakeTestingClusterSettings()
sql.SecondaryTenantsMultiRegionAbstractionsEnabled.Override(context.Background(), &cs.SV, true)
sql.SecondaryTenantZoneConfigsEnabled.Override(context.Background(), &cs.SV, true)

tc, _, cleanup := multiregionccltestutils.TestingCreateMultiRegionCluster(
t, 3 /*numServers*/, base.TestingKnobs{}, multiregionccltestutils.WithSettings(cs),
)
defer cleanup()

tenID := roachpb.MustMakeTenantID(10)
ten, tSQL := serverutils.StartTenant(t, tc.Server(0), base.TestTenantArgs{
Settings: cs,
TenantID: tenID,
Locality: roachpb.Locality{
Tiers: []roachpb.Tier{
{Key: "region", Value: "us-east1"},
},
},
})
defer tSQL.Close()
tenSQLDB := sqlutils.MakeSQLRunner(tSQL)

// Update system database with regions.
tenSQLDB.Exec(t, `ALTER DATABASE system SET PRIMARY REGION "us-east1"`)
tenSQLDB.Exec(t, `ALTER DATABASE system ADD REGION "us-east2"`)
tenSQLDB.Exec(t, `ALTER DATABASE system ADD REGION "us-east3"`)

ten2, tSQL2 := serverutils.StartTenant(t, tc.Server(2), base.TestTenantArgs{
Settings: cs,
TenantID: tenID,
Existing: true,
Locality: roachpb.Locality{
Tiers: []roachpb.Tier{
{Key: "region", Value: "us-east3"},
},
},
})
defer tSQL2.Close()
tenSQLDB2 := sqlutils.MakeSQLRunner(tSQL2)

// The sqlliveness entry created by the first SQL server has enum.One as the
// region as the system database hasn't been updated when it first started.
var sessionID string
tenSQLDB2.QueryRow(t, `SELECT session_id FROM system.sql_instances WHERE id = $1`,
ten.SQLInstanceID()).Scan(&sessionID)
region, id, err := slstorage.UnsafeDecodeSessionID(sqlliveness.SessionID(sessionID))
require.NoError(t, err)
require.NotNil(t, id)
require.Equal(t, enum.One, region)

// Ensure that the sqlliveness entry created by the second SQL server has
// the right region and session UUID.
tenSQLDB2.QueryRow(t, `SELECT session_id FROM system.sql_instances WHERE id = $1`,
ten2.SQLInstanceID()).Scan(&sessionID)
region, id, err = slstorage.UnsafeDecodeSessionID(sqlliveness.SessionID(sessionID))
require.NoError(t, err)
require.NotNil(t, id)
require.NotEqual(t, enum.One, region)

rows := tenSQLDB2.Query(t, `SELECT crdb_region, session_uuid FROM system.sqlliveness`)
defer rows.Close()
livenessMap := map[string][]byte{}
for rows.Next() {
var region, sessionUUID string
require.NoError(t, rows.Scan(&region, &sessionUUID))
livenessMap[sessionUUID] = []byte(region)
}
require.NoError(t, rows.Err())
r, ok := livenessMap[string(id)]
require.True(t, ok)
require.Equal(t, r, region)
}
108 changes: 108 additions & 0 deletions pkg/ccl/multiregionccl/region_util_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Copyright 2022 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package multiregionccl_test

import (
"context"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/ccl/multiregionccl/multiregionccltestutils"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/stretchr/testify/require"
)

// TestGetLocalityRegionEnumPhysicalRepresentation is in the ccl package since
// it utilizes adding regions to a database.
func TestGetLocalityRegionEnumPhysicalRepresentation(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
tc, sqlDB, cleanup := multiregionccltestutils.TestingCreateMultiRegionCluster(
t, 3 /* numServers */, base.TestingKnobs{})
defer cleanup()

tDB := sqlutils.MakeSQLRunner(sqlDB)
tDB.Exec(t, `CREATE DATABASE foo PRIMARY REGION "us-east1" REGIONS "us-east1", "us-east2", "us-east3"`)

s0 := tc.ServerTyped(0)
ief := s0.InternalExecutorFactory().(descs.TxnManager)
dbID := sqlutils.QueryDatabaseID(t, sqlDB, "foo")

t.Run("with locality that exists", func(t *testing.T) {
regionEnum, err := sql.GetLocalityRegionEnumPhysicalRepresentation(
ctx, ief, s0.DB(), descpb.ID(dbID), roachpb.Locality{
Tiers: []roachpb.Tier{{Key: "region", Value: "us-east2"}},
},
)
require.NoError(t, err)

enumMembers := getEnumMembers(t, ctx, tc.Server(0), descpb.ID(dbID))
require.NotEmpty(t, enumMembers)
require.Equal(t, enumMembers["us-east2"], regionEnum)
})

t.Run("with non-existent locality", func(t *testing.T) {
regionEnum, err := sql.GetLocalityRegionEnumPhysicalRepresentation(
ctx, ief, s0.DB(), descpb.ID(dbID), roachpb.Locality{
Tiers: []roachpb.Tier{{Key: "region", Value: "europe-west1"}},
},
)
require.NoError(t, err)

// Fallback to primary region if the locality is provided, but non-existent.
enumMembers := getEnumMembers(t, ctx, tc.Server(0), descpb.ID(dbID))
require.NotEmpty(t, enumMembers)
require.Equal(t, enumMembers["us-east1"], regionEnum)
})

t.Run("without locality", func(t *testing.T) {
regionEnum, err := sql.GetLocalityRegionEnumPhysicalRepresentation(
ctx, ief, s0.DB(), descpb.ID(dbID), roachpb.Locality{})
require.NoError(t, err)

// Fallback to primary region is locality information is missing.
enumMembers := getEnumMembers(t, ctx, tc.Server(0), descpb.ID(dbID))
require.NotEmpty(t, enumMembers)
require.Equal(t, enumMembers["us-east1"], regionEnum)
})
}

func getEnumMembers(
t *testing.T, ctx context.Context, ts serverutils.TestServerInterface, dbID descpb.ID,
) map[string][]byte {
t.Helper()
enumMembers := make(map[string][]byte)
err := sql.TestingDescsTxn(ctx, ts, func(ctx context.Context, txn *kv.Txn, descsCol *descs.Collection) error {
_, dbDesc, err := descsCol.GetImmutableDatabaseByID(ctx, txn, dbID,
tree.DatabaseLookupFlags{Required: true})
require.NoError(t, err)
regionEnumID, err := dbDesc.MultiRegionEnumID()
require.NoError(t, err)
regionEnumDesc, err := descsCol.GetImmutableTypeByID(ctx, txn, regionEnumID,
tree.ObjectLookupFlags{CommonLookupFlags: tree.CommonLookupFlags{Required: true}})
require.NoError(t, err)
for ord := 0; ord < regionEnumDesc.NumEnumMembers(); ord++ {
enumMembers[regionEnumDesc.GetMemberLogicalRepresentation(ord)] = regionEnumDesc.GetMemberPhysicalRepresentation(ord)
}
return nil
})
require.NoError(t, err)
return enumMembers
}
Loading

0 comments on commit 5ebb4ca

Please sign in to comment.