Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
114264: regionliveness: minor nits for initial prober implementation r=rafiss a=rafiss

- Use QueryRowEx so that the prober always runs as a privilged node user.
- Avoid using SELECT *, since that can break if the columns change, and it makes the code harder to understand.
- Fix broken errors.Is usage and use HasType instead. Also changed the check to avoid an allocation.

Epic: CRDB-28158
informs: cockroachdb#113231
fixes: cockroachdb#114245
Release note: None

114375: sql: add issue numbers and clean up todos r=rharding6373 a=rharding6373

sql: add issue numbers and clean up todos

Epic: None
Release Note: None

114381: roachtest: bump timeout for tpcc/headroom/n4cpu16 r=renatolabs a=DarrylWong

When using metamorphic builds, this test would very rarely timeout or almost timeout. This change raises the timeout to 4 hours.

Epic: none
Release note: none
Fixes: cockroachdb#114281

Co-authored-by: Rafi Shamim <[email protected]>
Co-authored-by: rharding6373 <[email protected]>
Co-authored-by: DarrylWong <[email protected]>
  • Loading branch information
4 people committed Nov 13, 2023
4 parents 9fd7401 + 5d343fa + c50c379 + 4659efd commit 5de8693
Show file tree
Hide file tree
Showing 8 changed files with 59 additions and 38 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/multiregionccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ go_test(
"//pkg/sql/sem/tree",
"//pkg/sql/sqlinstance/instancestorage",
"//pkg/sql/sqlliveness",
"//pkg/sql/sqlliveness/slinstance",
"//pkg/sql/sqlliveness/slstorage",
"//pkg/sql/sqltestutils",
"//pkg/testutils",
Expand Down
45 changes: 23 additions & 22 deletions pkg/ccl/multiregionccl/regionliveness_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
sql2 "github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/isql"
"github.com/cockroachdb/cockroach/pkg/sql/regionliveness"
regions2 "github.com/cockroachdb/cockroach/pkg/sql/regions"
"github.com/cockroachdb/cockroach/pkg/sql/regions"
"github.com/cockroachdb/cockroach/pkg/sql/sqlinstance/instancestorage"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slinstance"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
Expand All @@ -45,10 +46,13 @@ func TestRegionLivenessProber(t *testing.T) {

ctx := context.Background()

// Enable settings required for configuring a tenant's system database as multi-region.
// Enable settings required for configuring a tenant's system database as
// multi-region. and enable region liveness for testing.
makeSettings := func() *cluster.Settings {
cs := cluster.MakeTestingClusterSettings()
instancestorage.ReclaimLoopInterval.Override(ctx, &cs.SV, 150*time.Millisecond)
slinstance.DefaultTTL.Override(ctx, &cs.SV, 10*time.Second)
regionliveness.RegionLivenessEnabled.Override(ctx, &cs.SV, true)
return cs
}

Expand All @@ -57,7 +61,7 @@ func TestRegionLivenessProber(t *testing.T) {
"us-west",
"us-south",
}
cluster, _, cleanup := multiregionccltestutils.TestingCreateMultiRegionClusterWithRegionList(t,
testCluster, _, cleanup := multiregionccltestutils.TestingCreateMultiRegionClusterWithRegionList(t,
expectedRegions,
1,
base.TestingKnobs{},
Expand All @@ -70,44 +74,41 @@ func TestRegionLivenessProber(t *testing.T) {
var tenants []serverutils.ApplicationLayerInterface
var tenantSQL []*gosql.DB
blockProbeQuery := atomic.Bool{}
defer regionliveness.TestingSetProbeLivenessTimeout(500 * time.Millisecond)()

for _, s := range cluster.Servers {
for _, s := range testCluster.Servers {
tenantArgs := base.TestTenantArgs{
Settings: makeSettings(),
TenantID: id,
Locality: s.Locality(),
TestingKnobs: base.TestingKnobs{
SQLExecutor: &sql2.ExecutorTestingKnobs{
SQLExecutor: &sql.ExecutorTestingKnobs{
BeforeExecute: func(ctx context.Context, stmt string, descriptors *descs.Collection) {
const probeQuery = "SELECT * FROM system.sql_instances WHERE crdb_region = $1::system.crdb_internal_region"
if strings.Contains(stmt, probeQuery) &&
blockProbeQuery.Swap(false) {
const probeQuery = "SELECT count(*) FROM system.sql_instances WHERE crdb_region = $1::system.crdb_internal_region"
if strings.Contains(stmt, probeQuery) && blockProbeQuery.Swap(false) {
// Timeout this query intentionally.
time.Sleep(15 * time.Second)
time.Sleep(1 * time.Second)
}
},
},
},
}
ts, sql := serverutils.StartTenant(t, s, tenantArgs)
ts, tenantDB := serverutils.StartTenant(t, s, tenantArgs)
tenants = append(tenants, ts)
tenantSQL = append(tenantSQL, sql)
tenantSQL = append(tenantSQL, tenantDB)
}
// Enable region liveness for testing.
_, err = tenantSQL[0].Exec("SET CLUSTER SETTING sql.region_liveness.enabled=true")
require.NoError(t, err)
// Convert into a multi-region DB.
_, err = tenantSQL[0].Exec(fmt.Sprintf("ALTER DATABASE system SET PRIMARY REGION '%s'", cluster.Servers[0].Locality().Tiers[0].Value))
_, err = tenantSQL[0].Exec(fmt.Sprintf("ALTER DATABASE system SET PRIMARY REGION '%s'", testCluster.Servers[0].Locality().Tiers[0].Value))
require.NoError(t, err)
for i := 1; i < len(expectedRegions); i++ {
_, err = tenantSQL[0].Exec(fmt.Sprintf("ALTER DATABASE system ADD REGION '%s'", expectedRegions[i]))
require.NoError(t, err)
}
idb := tenants[0].InternalDB().(isql.DB)
cf := tenants[0].CollectionFactory().(*descs.CollectionFactory)
statusServer := tenants[0].SQLServer().(*sql2.Server).GetExecutorConfig().TenantStatusServer
statusServer := tenants[0].SQLServer().(*sql.Server).GetExecutorConfig().TenantStatusServer
providerFactory := func(txn *kv.Txn) regionliveness.RegionProvider {
return regions2.NewProvider(tenants[0].Codec(), statusServer, txn, cf.NewCollection(ctx))
return regions.NewProvider(tenants[0].Codec(), statusServer, txn, cf.NewCollection(ctx))
}
regionProber := regionliveness.NewLivenessProber(idb, providerFactory, tenants[0].ClusterSettings())
// Validates the expected regions versus the region liveness set.
Expand All @@ -123,18 +124,18 @@ func TestRegionLivenessProber(t *testing.T) {

// Validate all regions in the cluster are correctly reported as live.
testTxn := tenants[0].DB().NewTxn(ctx, "test-txn")
regions, err := regionProber.QueryLiveness(ctx, testTxn)
liveRegions, err := regionProber.QueryLiveness(ctx, testTxn)
require.NoError(t, err)
checkExpectedRegions(expectedRegions, regions)
checkExpectedRegions(expectedRegions, liveRegions)
// Attempt to probe all regions, they should be all up still.
for _, region := range expectedRegions {
require.NoError(t, regionProber.ProbeLiveness(ctx, region))
}
// Validate all regions in the cluster are still reported as live.
testTxn = tenants[0].DB().NewTxn(ctx, "test-txn")
regions, err = regionProber.QueryLiveness(ctx, testTxn)
liveRegions, err = regionProber.QueryLiveness(ctx, testTxn)
require.NoError(t, err)
checkExpectedRegions(expectedRegions, regions)
checkExpectedRegions(expectedRegions, liveRegions)
// Probe the liveness of the region, but timeout the query
// intentionally to make it seem dead.
blockProbeQuery.Store(true)
Expand Down
1 change: 1 addition & 0 deletions pkg/cmd/roachtest/tests/tpcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,7 @@ func registerTPCC(r registry.Registry) {
Suites: registry.Suites(registry.Nightly, registry.ReleaseQualification),
Tags: registry.Tags(`default`, `release_qualification`, `aws`),
Cluster: headroomSpec,
Timeout: 4 * time.Hour,
EncryptionSupport: registry.EncryptionMetamorphic,
Leases: registry.MetamorphicLeases,
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/explain_bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -900,7 +900,7 @@ func (c *stmtEnvCollector) PrintRelevantCreateUdf(
) error {
// The select function_name returns a DOidWrapper,
// we need to cast it to string for queryRows function to process.
// TODO: consider getting the udf sql body statements from the memo metadata.
// TODO(#104976): consider getting the udf sql body statements from the memo metadata.
functionNameQuery := "SELECT function_name::STRING as function_name_str FROM [SHOW FUNCTIONS]"
udfNames, err := c.queryRows(functionNameQuery)
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions pkg/sql/logictest/testdata/logic_test/udf_record
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ $$
SELECT a, b;
$$ LANGUAGE SQL;

# TODO(harding): In postgres, calls to f_amb_setof should succeed and return 0 rows.
# TODO(#100928): In postgres, calls to f_amb_setof should succeed and return 0 rows.
statement error pgcode 42725 pq: ambiguous call: f_amb_setof\(int, unknown\), candidates are
SELECT f_amb_setof(1, NULL);

Expand All @@ -365,11 +365,11 @@ $$
SELECT a, b;
$$ LANGUAGE SQL;

# TODO(harding): In postgres, calls to f_amb should succeed and return NULL.
# TODO(#100928): In postgres, calls to f_amb should succeed and return NULL.
statement error pgcode 42725 pq: ambiguous call: f_amb\(int, unknown\), candidates are
SELECT f_amb(1, NULL);

# TODO(harding): In postgres, calls to f_amb as a data source should succeed
# TODO(#100928): In postgres, calls to f_amb as a data source should succeed
# and return NULL NULL.
statement error pgcode 42725 pq: ambiguous call: f_amb\(int, unknown\), candidates are
SELECT * FROM f_amb(1, NULL) as foo(a int, b int);
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/regionliveness/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ go_library(
"//pkg/sql/pgwire/pgcode",
"//pkg/sql/pgwire/pgerror",
"//pkg/sql/sem/tree",
"//pkg/sql/sessiondata",
"//pkg/sql/sqlliveness/slinstance",
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
Expand Down
40 changes: 29 additions & 11 deletions pkg/sql/regionliveness/prober.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slinstance"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -61,6 +62,16 @@ type livenessProber struct {
settings *clustersettings.Settings
}

var probeLivenessTimeout = 15 * time.Second

func TestingSetProbeLivenessTimeout(newTimeout time.Duration) func() {
oldTimeout := probeLivenessTimeout
probeLivenessTimeout = newTimeout
return func() {
probeLivenessTimeout = oldTimeout
}
}

// NewLivenessProber creates a new region liveness prober.
func NewLivenessProber(
db isql.DB, regionProviderFactory RegionProviderFactory, settings *clustersettings.Settings,
Expand All @@ -79,19 +90,24 @@ func (l *livenessProber) ProbeLiveness(ctx context.Context, region string) error
return nil
}
const probeQuery = `
SELECT * FROM system.sql_instances WHERE crdb_region=$1::system.crdb_internal_region
SELECT count(*) FROM system.sql_instances WHERE crdb_region = $1::system.crdb_internal_region
`
err := timeutil.RunWithTimeout(ctx, "probe-liveness", time.Second*15,
err := timeutil.RunWithTimeout(ctx, "probe-liveness", probeLivenessTimeout,
func(ctx context.Context) error {
return l.db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
_, err := txn.Exec(ctx, "probe-sql-instance", txn.KV(), probeQuery, region)
return err
_, err := txn.QueryRowEx(
ctx, "probe-sql-instance", txn.KV(), sessiondata.NodeUserSessionDataOverride,
probeQuery, region,
)
if err != nil {
return err
}
return nil
})
})

// Region is alive or we hit some other error.
if err == nil ||
!IsQueryTimeoutErr(err) {
if err == nil || !IsQueryTimeoutErr(err) {
return err
}

Expand Down Expand Up @@ -137,17 +153,19 @@ func (l *livenessProber) QueryLiveness(ctx context.Context, txn *kv.Txn) (LiveRe
return regionStatus, nil
}
// Detect and down regions and remove them.
rows, err := executor.QueryBuffered(ctx, "query-region-liveness", txn,
"SELECT * FROM system.region_liveness")
rows, err := executor.QueryBufferedEx(
ctx, "query-region-liveness", txn, sessiondata.NodeUserSessionDataOverride,
"SELECT crdb_region, unavailable_at FROM system.region_liveness",
)
if err != nil {
return nil, err
}
for _, row := range rows {
enum, _ := tree.AsDEnum(row[0])
timestamp := tree.MustBeDTimestamp(row[1])
unavailableAt := tree.MustBeDTimestamp(row[1])
// Region is now officially unavailable, so lets remove
// it.
if txn.ReadTimestamp().GoTime().After(timestamp.Time) {
if txn.ReadTimestamp().GoTime().After(unavailableAt.Time) {
delete(regionStatus, enum.LogicalRep)
}
}
Expand All @@ -158,5 +176,5 @@ func (l *livenessProber) QueryLiveness(ctx context.Context, txn *kv.Txn) (LiveRe
// when checking for region liveness.
func IsQueryTimeoutErr(err error) bool {
return pgerror.GetPGCode(err) == pgcode.QueryCanceled ||
errors.Is(err, &timeutil.TimeoutError{})
errors.HasType(err, (*timeutil.TimeoutError)(nil))
}
1 change: 0 additions & 1 deletion pkg/sql/sem/tree/stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,6 @@ func CanWriteData(stmt Statement) bool {

// ReturnsAtMostOneRow returns true if the statement returns either no rows or
// a single row.
// TODO(harding): Expand this list.
func ReturnsAtMostOneRow(stmt Statement) bool {
switch stmt.(type) {
// Import operations.
Expand Down

0 comments on commit 5de8693

Please sign in to comment.