Skip to content

Commit

Permalink
regionliveness: minor nits for initial proober implementation
Browse files Browse the repository at this point in the history
- Use QueryRowEx so that the prober always runs as a privilged node user.
- Avoid using SELECT *, since that can break if the columns change, and
  it makes the code harder to understand.
- Fix broken errors.Is usage and use HasType instead. Also changed the
  check to avoid an allocation.
- Cleaned up test to override timeouts so it doesn't take as long.

Release note: None
  • Loading branch information
rafiss committed Nov 13, 2023
1 parent e1c2d15 commit 5d343fa
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 33 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/multiregionccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ go_test(
"//pkg/sql/sem/tree",
"//pkg/sql/sqlinstance/instancestorage",
"//pkg/sql/sqlliveness",
"//pkg/sql/sqlliveness/slinstance",
"//pkg/sql/sqlliveness/slstorage",
"//pkg/sql/sqltestutils",
"//pkg/testutils",
Expand Down
45 changes: 23 additions & 22 deletions pkg/ccl/multiregionccl/regionliveness_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
sql2 "github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/isql"
"github.com/cockroachdb/cockroach/pkg/sql/regionliveness"
regions2 "github.com/cockroachdb/cockroach/pkg/sql/regions"
"github.com/cockroachdb/cockroach/pkg/sql/regions"
"github.com/cockroachdb/cockroach/pkg/sql/sqlinstance/instancestorage"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slinstance"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
Expand All @@ -45,10 +46,13 @@ func TestRegionLivenessProber(t *testing.T) {

ctx := context.Background()

// Enable settings required for configuring a tenant's system database as multi-region.
// Enable settings required for configuring a tenant's system database as
// multi-region. and enable region liveness for testing.
makeSettings := func() *cluster.Settings {
cs := cluster.MakeTestingClusterSettings()
instancestorage.ReclaimLoopInterval.Override(ctx, &cs.SV, 150*time.Millisecond)
slinstance.DefaultTTL.Override(ctx, &cs.SV, 10*time.Second)
regionliveness.RegionLivenessEnabled.Override(ctx, &cs.SV, true)
return cs
}

Expand All @@ -57,7 +61,7 @@ func TestRegionLivenessProber(t *testing.T) {
"us-west",
"us-south",
}
cluster, _, cleanup := multiregionccltestutils.TestingCreateMultiRegionClusterWithRegionList(t,
testCluster, _, cleanup := multiregionccltestutils.TestingCreateMultiRegionClusterWithRegionList(t,
expectedRegions,
1,
base.TestingKnobs{},
Expand All @@ -70,44 +74,41 @@ func TestRegionLivenessProber(t *testing.T) {
var tenants []serverutils.ApplicationLayerInterface
var tenantSQL []*gosql.DB
blockProbeQuery := atomic.Bool{}
defer regionliveness.TestingSetProbeLivenessTimeout(500 * time.Millisecond)()

for _, s := range cluster.Servers {
for _, s := range testCluster.Servers {
tenantArgs := base.TestTenantArgs{
Settings: makeSettings(),
TenantID: id,
Locality: s.Locality(),
TestingKnobs: base.TestingKnobs{
SQLExecutor: &sql2.ExecutorTestingKnobs{
SQLExecutor: &sql.ExecutorTestingKnobs{
BeforeExecute: func(ctx context.Context, stmt string, descriptors *descs.Collection) {
const probeQuery = "SELECT * FROM system.sql_instances WHERE crdb_region = $1::system.crdb_internal_region"
if strings.Contains(stmt, probeQuery) &&
blockProbeQuery.Swap(false) {
const probeQuery = "SELECT count(*) FROM system.sql_instances WHERE crdb_region = $1::system.crdb_internal_region"
if strings.Contains(stmt, probeQuery) && blockProbeQuery.Swap(false) {
// Timeout this query intentionally.
time.Sleep(15 * time.Second)
time.Sleep(1 * time.Second)
}
},
},
},
}
ts, sql := serverutils.StartTenant(t, s, tenantArgs)
ts, tenantDB := serverutils.StartTenant(t, s, tenantArgs)
tenants = append(tenants, ts)
tenantSQL = append(tenantSQL, sql)
tenantSQL = append(tenantSQL, tenantDB)
}
// Enable region liveness for testing.
_, err = tenantSQL[0].Exec("SET CLUSTER SETTING sql.region_liveness.enabled=true")
require.NoError(t, err)
// Convert into a multi-region DB.
_, err = tenantSQL[0].Exec(fmt.Sprintf("ALTER DATABASE system SET PRIMARY REGION '%s'", cluster.Servers[0].Locality().Tiers[0].Value))
_, err = tenantSQL[0].Exec(fmt.Sprintf("ALTER DATABASE system SET PRIMARY REGION '%s'", testCluster.Servers[0].Locality().Tiers[0].Value))
require.NoError(t, err)
for i := 1; i < len(expectedRegions); i++ {
_, err = tenantSQL[0].Exec(fmt.Sprintf("ALTER DATABASE system ADD REGION '%s'", expectedRegions[i]))
require.NoError(t, err)
}
idb := tenants[0].InternalDB().(isql.DB)
cf := tenants[0].CollectionFactory().(*descs.CollectionFactory)
statusServer := tenants[0].SQLServer().(*sql2.Server).GetExecutorConfig().TenantStatusServer
statusServer := tenants[0].SQLServer().(*sql.Server).GetExecutorConfig().TenantStatusServer
providerFactory := func(txn *kv.Txn) regionliveness.RegionProvider {
return regions2.NewProvider(tenants[0].Codec(), statusServer, txn, cf.NewCollection(ctx))
return regions.NewProvider(tenants[0].Codec(), statusServer, txn, cf.NewCollection(ctx))
}
regionProber := regionliveness.NewLivenessProber(idb, providerFactory, tenants[0].ClusterSettings())
// Validates the expected regions versus the region liveness set.
Expand All @@ -123,18 +124,18 @@ func TestRegionLivenessProber(t *testing.T) {

// Validate all regions in the cluster are correctly reported as live.
testTxn := tenants[0].DB().NewTxn(ctx, "test-txn")
regions, err := regionProber.QueryLiveness(ctx, testTxn)
liveRegions, err := regionProber.QueryLiveness(ctx, testTxn)
require.NoError(t, err)
checkExpectedRegions(expectedRegions, regions)
checkExpectedRegions(expectedRegions, liveRegions)
// Attempt to probe all regions, they should be all up still.
for _, region := range expectedRegions {
require.NoError(t, regionProber.ProbeLiveness(ctx, region))
}
// Validate all regions in the cluster are still reported as live.
testTxn = tenants[0].DB().NewTxn(ctx, "test-txn")
regions, err = regionProber.QueryLiveness(ctx, testTxn)
liveRegions, err = regionProber.QueryLiveness(ctx, testTxn)
require.NoError(t, err)
checkExpectedRegions(expectedRegions, regions)
checkExpectedRegions(expectedRegions, liveRegions)
// Probe the liveness of the region, but timeout the query
// intentionally to make it seem dead.
blockProbeQuery.Store(true)
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/regionliveness/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ go_library(
"//pkg/sql/pgwire/pgcode",
"//pkg/sql/pgwire/pgerror",
"//pkg/sql/sem/tree",
"//pkg/sql/sessiondata",
"//pkg/sql/sqlliveness/slinstance",
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
Expand Down
40 changes: 29 additions & 11 deletions pkg/sql/regionliveness/prober.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slinstance"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -61,6 +62,16 @@ type livenessProber struct {
settings *clustersettings.Settings
}

var probeLivenessTimeout = 15 * time.Second

func TestingSetProbeLivenessTimeout(newTimeout time.Duration) func() {
oldTimeout := probeLivenessTimeout
probeLivenessTimeout = newTimeout
return func() {
probeLivenessTimeout = oldTimeout
}
}

// NewLivenessProber creates a new region liveness prober.
func NewLivenessProber(
db isql.DB, regionProviderFactory RegionProviderFactory, settings *clustersettings.Settings,
Expand All @@ -79,19 +90,24 @@ func (l *livenessProber) ProbeLiveness(ctx context.Context, region string) error
return nil
}
const probeQuery = `
SELECT * FROM system.sql_instances WHERE crdb_region=$1::system.crdb_internal_region
SELECT count(*) FROM system.sql_instances WHERE crdb_region = $1::system.crdb_internal_region
`
err := timeutil.RunWithTimeout(ctx, "probe-liveness", time.Second*15,
err := timeutil.RunWithTimeout(ctx, "probe-liveness", probeLivenessTimeout,
func(ctx context.Context) error {
return l.db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
_, err := txn.Exec(ctx, "probe-sql-instance", txn.KV(), probeQuery, region)
return err
_, err := txn.QueryRowEx(
ctx, "probe-sql-instance", txn.KV(), sessiondata.NodeUserSessionDataOverride,
probeQuery, region,
)
if err != nil {
return err
}
return nil
})
})

// Region is alive or we hit some other error.
if err == nil ||
!IsQueryTimeoutErr(err) {
if err == nil || !IsQueryTimeoutErr(err) {
return err
}

Expand Down Expand Up @@ -137,17 +153,19 @@ func (l *livenessProber) QueryLiveness(ctx context.Context, txn *kv.Txn) (LiveRe
return regionStatus, nil
}
// Detect and down regions and remove them.
rows, err := executor.QueryBuffered(ctx, "query-region-liveness", txn,
"SELECT * FROM system.region_liveness")
rows, err := executor.QueryBufferedEx(
ctx, "query-region-liveness", txn, sessiondata.NodeUserSessionDataOverride,
"SELECT crdb_region, unavailable_at FROM system.region_liveness",
)
if err != nil {
return nil, err
}
for _, row := range rows {
enum, _ := tree.AsDEnum(row[0])
timestamp := tree.MustBeDTimestamp(row[1])
unavailableAt := tree.MustBeDTimestamp(row[1])
// Region is now officially unavailable, so lets remove
// it.
if txn.ReadTimestamp().GoTime().After(timestamp.Time) {
if txn.ReadTimestamp().GoTime().After(unavailableAt.Time) {
delete(regionStatus, enum.LogicalRep)
}
}
Expand All @@ -158,5 +176,5 @@ func (l *livenessProber) QueryLiveness(ctx context.Context, txn *kv.Txn) (LiveRe
// when checking for region liveness.
func IsQueryTimeoutErr(err error) bool {
return pgerror.GetPGCode(err) == pgcode.QueryCanceled ||
errors.Is(err, &timeutil.TimeoutError{})
errors.HasType(err, (*timeutil.TimeoutError)(nil))
}

0 comments on commit 5d343fa

Please sign in to comment.