diff --git a/pkg/ccl/multiregionccl/BUILD.bazel b/pkg/ccl/multiregionccl/BUILD.bazel index 307bcec3be88..42d1ccc44d1c 100644 --- a/pkg/ccl/multiregionccl/BUILD.bazel +++ b/pkg/ccl/multiregionccl/BUILD.bazel @@ -28,6 +28,7 @@ go_test( srcs = [ "datadriven_test.go", "main_test.go", + "multiregion_system_table_test.go", "multiregion_test.go", "region_test.go", "regional_by_row_test.go", @@ -63,10 +64,12 @@ go_test( "//pkg/sql/catalog/descpb", "//pkg/sql/catalog/descs", "//pkg/sql/catalog/desctestutils", + "//pkg/sql/enum", "//pkg/sql/execinfra", "//pkg/sql/parser", "//pkg/sql/rowenc", "//pkg/sql/sem/tree", + "//pkg/sql/sqlliveness/slstorage", "//pkg/sql/sqltestutils", "//pkg/sql/tests", "//pkg/testutils", @@ -75,12 +78,17 @@ go_test( "//pkg/testutils/sqlutils", "//pkg/testutils/testcluster", "//pkg/util", + "//pkg/util/envutil", + "//pkg/util/hlc", "//pkg/util/leaktest", "//pkg/util/log", "//pkg/util/randutil", "//pkg/util/syncutil", + "//pkg/util/timeutil", "//pkg/util/tracing", "//pkg/util/tracing/tracingpb", + "//pkg/util/uuid", + "@com_github_cockroachdb_apd_v3//:apd", "@com_github_cockroachdb_datadriven//:datadriven", "@com_github_cockroachdb_errors//:errors", "@com_github_stretchr_testify//require", diff --git a/pkg/ccl/multiregionccl/multiregion_system_table_test.go b/pkg/ccl/multiregionccl/multiregion_system_table_test.go new file mode 100644 index 000000000000..2acc44313642 --- /dev/null +++ b/pkg/ccl/multiregionccl/multiregion_system_table_test.go @@ -0,0 +1,121 @@ +// Copyright 2022 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package multiregionccl + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/cockroachdb/apd/v3" + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/ccl/multiregionccl/multiregionccltestutils" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/sql/enum" + "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slstorage" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/util/envutil" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/cockroach/pkg/util/uuid" + "github.com/stretchr/testify/require" +) + +func createSqllivenessTable( + t *testing.T, db *sqlutils.SQLRunner, dbName string, +) (tableID descpb.ID) { + t.Helper() + db.Exec(t, fmt.Sprintf(` + CREATE DATABASE IF NOT EXISTS "%s" + WITH PRIMARY REGION "us-east1" + REGIONS "us-east1", "us-east2", "us-east3" + `, dbName)) + + // expiration needs to be column 2. slstorage.Table assumes the column id. + // session_uuid and crdb_region are identified by their location in the + // primary key. + db.Exec(t, fmt.Sprintf(` + CREATE TABLE "%s".sqlliveness ( + session_uuid BYTES NOT NULL, + expiration DECIMAL NOT NULL, + crdb_region "%s".public.crdb_internal_region, + PRIMARY KEY(crdb_region, session_uuid) + ) LOCALITY REGIONAL BY ROW; + `, dbName, dbName)) + db.QueryRow(t, ` + select u.id + from system.namespace t + join system.namespace u + on t.id = u."parentID" + where t.name = $1 and u.name = $2`, + dbName, "sqlliveness").Scan(&tableID) + return tableID +} + +func TestRbrSqllivenessTable(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + defer envutil.TestSetEnv(t, "COCKROACH_MR_SYSTEM_DATABASE", "1")() + + ctx := context.Background() + + cluster, sqlDB, cleanup := multiregionccltestutils.TestingCreateMultiRegionCluster(t, 3, base.TestingKnobs{}) + defer cleanup() + kvDB := cluster.Servers[0].DB() + settings := cluster.Servers[0].Cfg.Settings + stopper := cluster.Servers[0].Stopper() + + tDB := sqlutils.MakeSQLRunner(sqlDB) + + t0 := time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC) + timeSource := timeutil.NewManualTime(t0) + clock := hlc.NewClock(timeSource, base.DefaultMaxClockOffset) + + setup := func(t *testing.T) *slstorage.Storage { + dbName := t.Name() + tableID := createSqllivenessTable(t, tDB, dbName) + var ambientCtx log.AmbientContext + // rbrIndexID is the index id used to access the regional by row index in + // tests. In production it will be index 2, but the freshly created test table + // will have index 1. + const rbrIndexID = 1 + return slstorage.NewTestingStorage(ambientCtx, stopper, clock, kvDB, keys.SystemSQLCodec, settings, + tableID, rbrIndexID, timeSource.NewTimer) + } + + t.Run("SqlRead", func(t *testing.T) { + storage := setup(t) + + initialUUID := uuid.MakeV4() + session, err := slstorage.MakeSessionID(enum.One, initialUUID) + require.NoError(t, err) + + writeExpiration := clock.Now().Add(10, 00) + require.NoError(t, storage.Insert(ctx, session, writeExpiration)) + + var sessionUUID string + var crdbRegion string + var rawExpiration apd.Decimal + + row := tDB.QueryRow(t, fmt.Sprintf(`SELECT crdb_region, session_uuid, expiration FROM "%s".sqlliveness`, t.Name())) + row.Scan(&crdbRegion, &sessionUUID, &rawExpiration) + + require.Contains(t, []string{"us-east1", "us-east2", "us-east3"}, crdbRegion) + require.Equal(t, sessionUUID, string(initialUUID.GetBytes())) + + readExpiration, err := hlc.DecimalToHLC(&rawExpiration) + require.NoError(t, err) + + require.Equal(t, writeExpiration, readExpiration) + }) +} diff --git a/pkg/sql/catalog/systemschema/BUILD.bazel b/pkg/sql/catalog/systemschema/BUILD.bazel index 575579de3bb3..da3731e2d8d3 100644 --- a/pkg/sql/catalog/systemschema/BUILD.bazel +++ b/pkg/sql/catalog/systemschema/BUILD.bazel @@ -19,6 +19,7 @@ go_library( "//pkg/sql/sem/catconstants", "//pkg/sql/sem/tree", "//pkg/sql/types", + "//pkg/util/envutil", "//pkg/util/log", ], ) diff --git a/pkg/sql/catalog/systemschema/system.go b/pkg/sql/catalog/systemschema/system.go index 83f908b12b9a..e86a46c93c8f 100644 --- a/pkg/sql/catalog/systemschema/system.go +++ b/pkg/sql/catalog/systemschema/system.go @@ -28,6 +28,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sem/catconstants" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/cockroach/pkg/util/envutil" "github.com/cockroachdb/cockroach/pkg/util/log" ) @@ -460,7 +461,16 @@ CREATE TABLE system.sqlliveness ( session_id BYTES NOT NULL, expiration DECIMAL NOT NULL, CONSTRAINT "primary" PRIMARY KEY (session_id), - FAMILY fam0_session_id_expiration (session_id, expiration) + FAMILY fam0_session_id_expiration (session_id, expiration) +)` + + MrSqllivenessTableSchema = ` +CREATE TABLE system.sqlliveness ( + session_uuid BYTES NOT NULL, + expiration DECIMAL NOT NULL, + crdb_region BYTES NOT NULL, + CONSTRAINT "primary" PRIMARY KEY (crdb_region, session_uuid), + FAMILY "primary" (crdb_region, session_uuid, expiration) )` MigrationsTableSchema = ` @@ -1996,26 +2006,61 @@ var ( )) // SqllivenessTable is the descriptor for the sqlliveness table. - SqllivenessTable = registerSystemTable( - SqllivenessTableSchema, - systemTable( - catconstants.SqllivenessTableName, - keys.SqllivenessID, - []descpb.ColumnDescriptor{ - {Name: "session_id", ID: 1, Type: types.Bytes, Nullable: false}, - {Name: "expiration", ID: 2, Type: types.Decimal, Nullable: false}, - }, - []descpb.ColumnFamilyDescriptor{ - { - Name: "fam0_session_id_expiration", - ID: 0, - ColumnNames: []string{"session_id", "expiration"}, - ColumnIDs: []descpb.ColumnID{1, 2}, - DefaultColumnID: 2, + // + // TODO(jeffswenson): remove the function wrapper around the + // SqllivenessTable descriptor. See TestSupportMultiRegion for context. + SqllivenessTable = func() catalog.TableDescriptor { + if TestSupportMultiRegion() { + return registerSystemTable( + MrSqllivenessTableSchema, + systemTable( + catconstants.SqllivenessTableName, + keys.SqllivenessID, + []descpb.ColumnDescriptor{ + {Name: "crdb_region", ID: 4, Type: types.Bytes, Nullable: false}, + {Name: "session_uuid", ID: 3, Type: types.Bytes, Nullable: false}, + {Name: "expiration", ID: 2, Type: types.Decimal, Nullable: false}, + }, + []descpb.ColumnFamilyDescriptor{ + { + Name: "primary", + ID: 0, + ColumnNames: []string{"crdb_region", "session_uuid", "expiration"}, + ColumnIDs: []descpb.ColumnID{4, 3, 2}, + DefaultColumnID: 2, + }, + }, + descpb.IndexDescriptor{ + Name: "primary", + ID: 2, + Unique: true, + KeyColumnNames: []string{"crdb_region", "session_uuid"}, + KeyColumnDirections: []catpb.IndexColumn_Direction{catpb.IndexColumn_ASC, catpb.IndexColumn_ASC}, + KeyColumnIDs: []descpb.ColumnID{4, 3}, + }, + )) + } + return registerSystemTable( + SqllivenessTableSchema, + systemTable( + catconstants.SqllivenessTableName, + keys.SqllivenessID, + []descpb.ColumnDescriptor{ + {Name: "session_id", ID: 1, Type: types.Bytes, Nullable: false}, + {Name: "expiration", ID: 2, Type: types.Decimal, Nullable: false}, }, - }, - pk("session_id"), - )) + []descpb.ColumnFamilyDescriptor{ + { + Name: "fam0_session_id_expiration", + ID: 0, + ColumnNames: []string{"session_id", "expiration"}, + ColumnIDs: []descpb.ColumnID{1, 2}, + DefaultColumnID: 2, + }, + }, + pk("session_id"), + )) + }() // MigrationsTable is the descriptor for the migrations table. It stores facts // about the completion state of long-running migrations. It is used to @@ -2579,3 +2624,14 @@ var ( // SpanConfigurationsTableName represents system.span_configurations. var SpanConfigurationsTableName = tree.NewTableNameWithSchema("system", tree.PublicSchemaName, tree.Name(catconstants.SpanConfigurationsTableName)) + +// TestSupportMultiRegion returns true if the cluster should support multi-region +// optimized system databases. +// +// TODO(jeffswenson): remove TestSupportMultiRegion after implementing +// migrations and version gates to migrate to the new regional by row +// compatible schemas. The helper exists to allow e2e testing of the in +// development multi-region system database features. +func TestSupportMultiRegion() bool { + return envutil.EnvOrDefaultBool("COCKROACH_MR_SYSTEM_DATABASE", false) +} diff --git a/pkg/sql/sqlliveness/slstorage/BUILD.bazel b/pkg/sql/sqlliveness/slstorage/BUILD.bazel index c11a02670628..1f8a7d824e38 100644 --- a/pkg/sql/sqlliveness/slstorage/BUILD.bazel +++ b/pkg/sql/sqlliveness/slstorage/BUILD.bazel @@ -4,6 +4,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "slstorage", srcs = [ + "key_encoder.go", "metrics.go", "sessionid.go", "slstorage.go", @@ -18,7 +19,8 @@ go_library( "//pkg/roachpb", "//pkg/settings", "//pkg/settings/cluster", - "//pkg/sql/catalog/descpb", + "//pkg/sql/catalog/systemschema", + "//pkg/sql/sem/catid", "//pkg/sql/sem/eval", "//pkg/sql/sqlliveness", "//pkg/util/cache", @@ -42,13 +44,14 @@ go_test( name = "slstorage_test", size = "small", srcs = [ + "key_encoder_test.go", "main_test.go", "sessionid_test.go", "slstorage_test.go", ], args = ["-test.timeout=55s"], + embed = [":slstorage"], deps = [ - ":slstorage", "//pkg/base", "//pkg/keys", "//pkg/kv/kvserver", @@ -65,6 +68,7 @@ go_test( "//pkg/testutils/serverutils", "//pkg/testutils/sqlutils", "//pkg/testutils/testcluster", + "//pkg/util/envutil", "//pkg/util/hlc", "//pkg/util/leaktest", "//pkg/util/log", diff --git a/pkg/sql/sqlliveness/slstorage/key_encoder.go b/pkg/sql/sqlliveness/slstorage/key_encoder.go new file mode 100644 index 000000000000..1c4ae8c84e0e --- /dev/null +++ b/pkg/sql/sqlliveness/slstorage/key_encoder.go @@ -0,0 +1,127 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package slstorage + +import ( + "bytes" + + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema" + "github.com/cockroachdb/cockroach/pkg/sql/sem/catid" + "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness" + "github.com/cockroachdb/cockroach/pkg/util/encoding" + "github.com/cockroachdb/cockroach/pkg/util/uuid" + "github.com/cockroachdb/errors" +) + +// keyCodec manages the SessionID <-> roachpb.Key mapping. +type keyCodec interface { + encode(sid sqlliveness.SessionID) (roachpb.Key, error) + decode(key roachpb.Key) (sqlliveness.SessionID, error) + + // indexPrefix returns the prefix for an encoded key. encode() will return + // something with the prefix and decode will expect a key with this prefix. + // + // indexPrefix() and indexPrefix.PrefixEnd() may be used to scan the + // content of the table. + indexPrefix() roachpb.Key +} + +// makeKeyCodec constructs a key codec. It consults the +// COCKROACH_MR_SYSTEM_DATABASE environment variable to determine if it should +// use the regional by table or regional by row index format. +func makeKeyCodec(codec keys.SQLCodec, tableID catid.DescID, rbrIndex catid.IndexID) keyCodec { + if systemschema.TestSupportMultiRegion() { + return &rbrEncoder{codec.IndexPrefix(uint32(tableID), uint32(rbrIndex))} + } + const rbtIndexID = 1 + return &rbtEncoder{codec.IndexPrefix(uint32(tableID), rbtIndexID)} +} + +type rbrEncoder struct { + rbrIndex roachpb.Key +} + +func (e *rbrEncoder) encode(session sqlliveness.SessionID) (roachpb.Key, error) { + region, uuid, err := UnsafeDecodeSessionID(session) + if err != nil { + return nil, err + } + if len(region) == 0 { + return nil, errors.Newf("legacy session passed to rbr table: '%s'", session.String()) + } + + const columnFamilyID = 0 + + key := e.indexPrefix() + key = encoding.EncodeBytesAscending(key, region) + key = encoding.EncodeBytesAscending(key, uuid) + return keys.MakeFamilyKey(key, columnFamilyID), nil +} + +func (e *rbrEncoder) decode(key roachpb.Key) (sqlliveness.SessionID, error) { + if !bytes.HasPrefix(key, e.rbrIndex) { + return "", errors.Newf("sqlliveness table key has an invalid prefix: %v", key) + } + rem := key[len(e.rbrIndex):] + + rem, region, err := encoding.DecodeBytesAscending(rem, nil) + if err != nil { + return "", errors.Wrap(err, "failed to decode region from session key") + } + + rem, rawUUID, err := encoding.DecodeBytesAscending(rem, nil) + if err != nil { + return "", errors.Wrap(err, "failed to decode uuid from session key") + } + + id, err := uuid.FromBytes(rawUUID) + if err != nil { + return "", errors.Wrap(err, "failed to convert uuid bytes to uuid id for session key") + } + + return MakeSessionID(region, id) +} + +func (e *rbrEncoder) indexPrefix() roachpb.Key { + return e.rbrIndex.Clone() +} + +type rbtEncoder struct { + rbtIndex roachpb.Key +} + +func (e *rbtEncoder) encode(id sqlliveness.SessionID) (roachpb.Key, error) { + const columnFamilyID = 0 + + key := e.indexPrefix() + key = encoding.EncodeBytesAscending(key, id.UnsafeBytes()) + return keys.MakeFamilyKey(key, columnFamilyID), nil +} + +func (e *rbtEncoder) decode(key roachpb.Key) (sqlliveness.SessionID, error) { + if !bytes.HasPrefix(key, e.rbtIndex) { + return "", errors.Newf("sqlliveness table key has an invalid prefix: %v", key) + } + rem := key[len(e.rbtIndex):] + + rem, session, err := encoding.DecodeBytesAscending(rem, nil) + if err != nil { + return "", errors.Wrap(err, "failed to decode region from session key") + } + + return sqlliveness.SessionID(session), nil +} + +func (e *rbtEncoder) indexPrefix() roachpb.Key { + return e.rbtIndex.Clone() +} diff --git a/pkg/sql/sqlliveness/slstorage/key_encoder_test.go b/pkg/sql/sqlliveness/slstorage/key_encoder_test.go new file mode 100644 index 000000000000..30eaedfeb446 --- /dev/null +++ b/pkg/sql/sqlliveness/slstorage/key_encoder_test.go @@ -0,0 +1,91 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package slstorage + +import ( + "bytes" + "testing" + + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema" + "github.com/cockroachdb/cockroach/pkg/sql/enum" + "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness" + "github.com/cockroachdb/cockroach/pkg/util/envutil" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/uuid" + "github.com/stretchr/testify/require" +) + +func TestKeyEncoder(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + t.Run("RegionalByRow", func(t *testing.T) { + defer envutil.TestSetEnv(t, "COCKROACH_MR_SYSTEM_DATABASE", "1")() + testKeyEncoder(t) + }) + t.Run("RegionalByTable", func(t *testing.T) { + defer envutil.TestSetEnv(t, "COCKROACH_MR_SYSTEM_DATABASE", "0")() + testKeyEncoder(t) + }) +} + +func testKeyEncoder(t *testing.T) { + codec := keys.MakeSQLCodec(roachpb.MakeTenantID(1337)) + keyCodec := makeKeyCodec(codec, 42, 2) + + t.Run("Prefix", func(t *testing.T) { + prefix := keyCodec.indexPrefix() + + rem, tenant, err := keys.DecodeTenantPrefix(prefix) + require.NoError(t, err) + require.Equal(t, tenant, roachpb.MakeTenantID(1337)) + + rem, tableID, indexID, err := keys.DecodeTableIDIndexID(rem) + require.NoError(t, err) + require.Equal(t, tableID, uint32(42)) + require.Len(t, rem, 0) + if systemschema.TestSupportMultiRegion() { + require.Equal(t, indexID, uint32(2)) + } else { + require.Equal(t, indexID, uint32(1)) + } + }) + + t.Run("RoundTrip", func(t *testing.T) { + id, err := MakeSessionID(enum.One, uuid.MakeV4()) + require.NoError(t, err) + + key, err := keyCodec.encode(id) + require.NoError(t, err) + require.True(t, bytes.HasPrefix(key, keyCodec.indexPrefix())) + + decodedID, err := keyCodec.decode(key) + require.NoError(t, err) + require.Equal(t, id, decodedID) + }) + + t.Run("EncodeLegacySession", func(t *testing.T) { + id := sqlliveness.SessionID(uuid.MakeV4().GetBytes()) + + key, err := keyCodec.encode(id) + if systemschema.TestSupportMultiRegion() { + require.Error(t, err) + } else { + require.NoError(t, err) + decodedID, err := keyCodec.decode(key) + require.NoError(t, err) + require.Equal(t, id, decodedID) + } + }) +} diff --git a/pkg/sql/sqlliveness/slstorage/sessionid_test.go b/pkg/sql/sqlliveness/slstorage/sessionid_test.go index 9b4669080e79..8a46cb008738 100644 --- a/pkg/sql/sqlliveness/slstorage/sessionid_test.go +++ b/pkg/sql/sqlliveness/slstorage/sessionid_test.go @@ -151,6 +151,7 @@ func TestSessionIDEncoding(t *testing.T) { require.Error(t, err) require.Contains(t, err.Error(), tc.err) } else { + require.NoError(t, err) require.Equal(t, region, tc.region) require.Equal(t, uuid, tc.id.GetBytes()) } diff --git a/pkg/sql/sqlliveness/slstorage/slstorage.go b/pkg/sql/sqlliveness/slstorage/slstorage.go index 3daeee40a251..52ba649d64cb 100644 --- a/pkg/sql/sqlliveness/slstorage/slstorage.go +++ b/pkg/sql/sqlliveness/slstorage/slstorage.go @@ -21,7 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" - "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/sql/sem/catid" "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness" "github.com/cockroachdb/cockroach/pkg/util/cache" @@ -89,7 +89,7 @@ type Storage struct { metrics Metrics gcInterval func() time.Duration newTimer func() timeutil.TimerI - tableID descpb.ID + keyCodec keyCodec mu struct { syncutil.Mutex @@ -119,7 +119,8 @@ func NewTestingStorage( db *kv.DB, codec keys.SQLCodec, settings *cluster.Settings, - sqllivenessTableID descpb.ID, + sqllivenessTableID catid.DescID, + rbrIndexID catid.IndexID, newTimer func() timeutil.TimerI, ) *Storage { s := &Storage{ @@ -130,7 +131,7 @@ func NewTestingStorage( clock: clock, db: db, codec: codec, - tableID: sqllivenessTableID, + keyCodec: makeKeyCodec(codec, sqllivenessTableID, rbrIndexID), newTimer: newTimer, gcInterval: func() time.Duration { baseInterval := GCInterval.Get(&settings.SV) @@ -160,7 +161,8 @@ func NewStorage( codec keys.SQLCodec, settings *cluster.Settings, ) *Storage { - return NewTestingStorage(ambientCtx, stopper, clock, db, codec, settings, keys.SqllivenessID, + const rbrIndexID = 2 + return NewTestingStorage(ambientCtx, stopper, clock, db, codec, settings, keys.SqllivenessID, rbrIndexID, timeutil.DefaultTimeSource{}.NewTimer) } @@ -311,7 +313,10 @@ func (s *Storage) deleteOrFetchSession( // Reset captured variable in case of retry. deleted, expiration, prevExpiration = false, hlc.Timestamp{}, hlc.Timestamp{} - k := s.makeSessionKey(sid) + k, err := s.keyCodec.encode(sid) + if err != nil { + return err + } kv, err := txn.Get(ctx, k) if err != nil { return err @@ -409,7 +414,7 @@ func (s *Storage) fetchExpiredSessionIDs(ctx context.Context) ([]sqlliveness.Ses var toCheck []sqlliveness.SessionID if err := s.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { toCheck = nil // reset for restarts - start := s.makeTablePrefix() + start := s.keyCodec.indexPrefix() end := start.PrefixEnd() now := s.clock.Now() const maxRows = 1024 // arbitrary but plenty @@ -428,7 +433,7 @@ func (s *Storage) fetchExpiredSessionIDs(ctx context.Context) ([]sqlliveness.Ses continue } if exp.Less(now) { - id, err := decodeSessionKey(rows[i].Key) + id, err := s.keyCodec.decode(rows[i].Key) if err != nil { log.Warningf(ctx, "failed to decode row %s session: %v", rows[i].Key.String(), err) } @@ -453,7 +458,10 @@ func (s *Storage) fetchExpiredSessionIDs(ctx context.Context) ([]sqlliveness.Ses func (s *Storage) Insert( ctx context.Context, sid sqlliveness.SessionID, expiration hlc.Timestamp, ) (err error) { - k := s.makeSessionKey(sid) + k, err := s.keyCodec.encode(sid) + if err != nil { + return err + } v := encodeValue(expiration) ctx = multitenant.WithTenantCostControlExemption(ctx) if err := s.db.InitPut(ctx, k, &v, true); err != nil { @@ -472,7 +480,10 @@ func (s *Storage) Update( ) (sessionExists bool, err error) { ctx = multitenant.WithTenantCostControlExemption(ctx) err = s.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - k := s.makeSessionKey(sid) + k, err := s.keyCodec.encode(sid) + if err != nil { + return err + } kv, err := txn.Get(ctx, k) if err != nil { return err @@ -514,36 +525,6 @@ func (s *cachedStorage) IsAlive( return (*Storage)(s).isAlive(ctx, sid, async) } -func (s *Storage) makeTablePrefix() roachpb.Key { - return s.codec.IndexPrefix(uint32(s.tableID), 1) -} - -func (s *Storage) makeSessionKey(id sqlliveness.SessionID) roachpb.Key { - return keys.MakeFamilyKey(encoding.EncodeBytesAscending(s.makeTablePrefix(), id.UnsafeBytes()), 0) -} - -func decodeSessionKey(k roachpb.Key) (sqlliveness.SessionID, error) { - prefix, err := keys.GetRowPrefixLength(k) - if err != nil { - return "", errors.Wrap(err, "failed to decode session key") - } - k = k[:prefix] - rem, _, err := keys.DecodeTenantPrefix(k) - if err != nil { - return "", errors.Wrap(err, "failed to decode tenant prefix from session key") - } - rem, _, _, err = keys.DecodeTableIDIndexID(rem) - if err != nil { - return "", errors.Wrap(err, "failed to decode table and index prefix from session key") - } - - _, idBytes, err := encoding.DecodeBytesAscending(rem, nil) - if err != nil { - return "", errors.Wrap(err, "failed to decode session ID from session key") - } - return sqlliveness.SessionID(idBytes), nil -} - func decodeValue(kv kv.KeyValue) (hlc.Timestamp, error) { tup, err := kv.Value.GetTuple() if err != nil { diff --git a/pkg/sql/sqlliveness/slstorage/slstorage_test.go b/pkg/sql/sqlliveness/slstorage/slstorage_test.go index c3cc028c8a83..1c99168c9f8e 100644 --- a/pkg/sql/sqlliveness/slstorage/slstorage_test.go +++ b/pkg/sql/sqlliveness/slstorage/slstorage_test.go @@ -13,6 +13,7 @@ package slstorage_test import ( "bytes" "context" + "fmt" "math/rand" "strings" "sync" @@ -27,11 +28,13 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema" + "github.com/cockroachdb/cockroach/pkg/sql/enum" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slstorage" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/util/envutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -50,6 +53,16 @@ func TestStorage(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + t.Run("RegionalByRow", func(t *testing.T) { + defer envutil.TestSetEnv(t, "COCKROACH_MR_SYSTEM_DATABASE", "1")() + testStorage(t) + }) + t.Run("RegionalByTable", func(t *testing.T) { + defer envutil.TestSetEnv(t, "COCKROACH_MR_SYSTEM_DATABASE", "0")() + testStorage(t) + }) +} +func testStorage(t *testing.T) { ctx := context.Background() s, sqlDB, kvDB := serverutils.StartServer(t, base.TestServerArgs{}) defer s.Stopper().Stop(ctx) @@ -60,12 +73,8 @@ func TestStorage(t *testing.T) { *hlc.Clock, *timeutil.ManualTime, *cluster.Settings, *stop.Stopper, *slstorage.Storage, ) { dbName := t.Name() - tDB.Exec(t, `CREATE DATABASE "`+dbName+`"`) - schema := strings.Replace(systemschema.SqllivenessTableSchema, - `CREATE TABLE system.sqlliveness`, - `CREATE TABLE "`+dbName+`".sqlliveness`, 1) - tDB.Exec(t, schema) - tableID := getTableID(t, tDB, dbName, "sqlliveness") + + tableID := newSqllivenessTable(t, tDB, dbName) timeSource := timeutil.NewManualTime(t0) clock := hlc.NewClock(timeSource, base.DefaultMaxClockOffset) @@ -73,7 +82,7 @@ func TestStorage(t *testing.T) { stopper := stop.NewStopper(stop.WithTracer(s.TracerI().(*tracing.Tracer))) var ambientCtx log.AmbientContext storage := slstorage.NewTestingStorage(ambientCtx, stopper, clock, kvDB, keys.SystemSQLCodec, settings, - tableID, timeSource.NewTimer) + tableID, rbrIndexID, timeSource.NewTimer) return clock, timeSource, settings, stopper, storage } @@ -83,7 +92,8 @@ func TestStorage(t *testing.T) { defer stopper.Stop(ctx) exp := clock.Now().Add(time.Second.Nanoseconds(), 0) - const id = "asdf" + id, err := slstorage.MakeSessionID(enum.One, uuid.MakeV4()) + require.NoError(t, err) metrics := storage.Metrics() { @@ -127,8 +137,10 @@ func TestStorage(t *testing.T) { // Create two records which will expire before nextGC. exp := clock.Now().Add(gcInterval.Nanoseconds()-1, 0) - const id1 = "asdf" - const id2 = "ghjk" + id1, err := slstorage.MakeSessionID(enum.One, uuid.MakeV4()) + require.NoError(t, err) + id2, err := slstorage.MakeSessionID(enum.One, uuid.MakeV4()) + require.NoError(t, err) { require.NoError(t, storage.Insert(ctx, id1, exp)) require.NoError(t, storage.Insert(ctx, id2, exp)) @@ -228,7 +240,8 @@ func TestStorage(t *testing.T) { storage.Start(ctx) exp := clock.Now().Add(time.Second.Nanoseconds(), 0) - const id = "asdf" + id, err := slstorage.MakeSessionID(enum.One, uuid.MakeV4()) + require.NoError(t, err) metrics := storage.Metrics() { @@ -308,19 +321,23 @@ func TestStorage(t *testing.T) { func TestConcurrentAccessesAndEvictions(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - + t.Run("RegionalByRow", func(t *testing.T) { + defer envutil.TestSetEnv(t, "COCKROACH_MR_SYSTEM_DATABASE", "1")() + testConcurrentAccessesAndEvictions(t) + }) + t.Run("RegionalByTable", func(t *testing.T) { + defer envutil.TestSetEnv(t, "COCKROACH_MR_SYSTEM_DATABASE", "0")() + testConcurrentAccessesAndEvictions(t) + }) +} +func testConcurrentAccessesAndEvictions(t *testing.T) { ctx := context.Background() s, sqlDB, kvDB := serverutils.StartServer(t, base.TestServerArgs{}) defer s.Stopper().Stop(ctx) tDB := sqlutils.MakeSQLRunner(sqlDB) t0 := time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC) dbName := t.Name() - tDB.Exec(t, `CREATE DATABASE "`+dbName+`"`) - schema := strings.Replace(systemschema.SqllivenessTableSchema, - `CREATE TABLE system.sqlliveness`, - `CREATE TABLE "`+dbName+`".sqlliveness`, 1) - tDB.Exec(t, schema) - tableID := getTableID(t, tDB, dbName, "sqlliveness") + tableID := newSqllivenessTable(t, tDB, dbName) timeSource := timeutil.NewManualTime(t0) clock := hlc.NewClock(timeSource, base.DefaultMaxClockOffset) @@ -330,7 +347,7 @@ func TestConcurrentAccessesAndEvictions(t *testing.T) { slstorage.CacheSize.Override(ctx, &settings.SV, 10) var ambientCtx log.AmbientContext storage := slstorage.NewTestingStorage(ambientCtx, stopper, clock, kvDB, keys.SystemSQLCodec, settings, - tableID, timeSource.NewTimer) + tableID, rbrIndexID, timeSource.NewTimer) storage.Start(ctx) const ( @@ -355,8 +372,11 @@ func TestConcurrentAccessesAndEvictions(t *testing.T) { t.Helper() state.Lock() defer state.Unlock() + + sid, err := slstorage.MakeSessionID(enum.One, uuid.MakeV4()) + require.NoError(t, err) s := session{ - id: sqlliveness.SessionID(uuid.MakeV4().String()), + id: sid, expiration: clock.Now().Add(expiration.Nanoseconds(), 0), } require.NoError(t, storage.Insert(ctx, s.id, s.expiration)) @@ -454,7 +474,16 @@ func TestConcurrentAccessesAndEvictions(t *testing.T) { func TestConcurrentAccessSynchronization(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - + t.Run("RegionalByRow", func(t *testing.T) { + defer envutil.TestSetEnv(t, "COCKROACH_MR_SYSTEM_DATABASE", "1")() + testConcurrentAccessSynchronization(t) + }) + t.Run("RegionalByTable", func(t *testing.T) { + defer envutil.TestSetEnv(t, "COCKROACH_MR_SYSTEM_DATABASE", "0")() + testConcurrentAccessSynchronization(t) + }) +} +func testConcurrentAccessSynchronization(t *testing.T) { ctx := context.Background() type filterFunc = func(ctx context.Context, request *roachpb.BatchRequest) *roachpb.Error var requestFilter atomic.Value @@ -476,12 +505,7 @@ func TestConcurrentAccessSynchronization(t *testing.T) { t0 := time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC) dbName := t.Name() - tDB.Exec(t, `CREATE DATABASE "`+dbName+`"`) - schema := strings.Replace(systemschema.SqllivenessTableSchema, - `CREATE TABLE system.sqlliveness`, - `CREATE TABLE "`+dbName+`".sqlliveness`, 1) - tDB.Exec(t, schema) - tableID := getTableID(t, tDB, dbName, "sqlliveness") + tableID := newSqllivenessTable(t, tDB, dbName) timeSource := timeutil.NewManualTime(t0) clock := hlc.NewClock(timeSource, base.DefaultMaxClockOffset) @@ -491,7 +515,7 @@ func TestConcurrentAccessSynchronization(t *testing.T) { slstorage.CacheSize.Override(ctx, &settings.SV, 10) var ambientCtx log.AmbientContext storage := slstorage.NewTestingStorage(ambientCtx, stopper, clock, kvDB, keys.SystemSQLCodec, settings, - tableID, timeSource.NewTimer) + tableID, rbrIndexID, timeSource.NewTimer) storage.Start(ctx) // Synchronize reading from the store with the blocked channel by detecting @@ -530,7 +554,8 @@ func TestConcurrentAccessSynchronization(t *testing.T) { cached := storage.CachedReader() var alive bool var g errgroup.Group - sid := sqlliveness.SessionID(t.Name()) + sid, err := slstorage.MakeSessionID(enum.One, uuid.MakeV4()) + require.NoError(t, err) g.Go(func() (err error) { alive, err = cached.IsAlive(ctx, sid) return err @@ -559,7 +584,8 @@ func TestConcurrentAccessSynchronization(t *testing.T) { cached := storage.CachedReader() var alive bool var g errgroup.Group - sid := sqlliveness.SessionID(t.Name()) + sid, err := slstorage.MakeSessionID(enum.One, uuid.MakeV4()) + require.NoError(t, err) toCancel, cancel := context.WithCancel(ctx) before := storage.Metrics().IsAliveCacheMisses.Count() @@ -607,7 +633,8 @@ func TestConcurrentAccessSynchronization(t *testing.T) { cached := storage.CachedReader() var alive bool var g errgroup.Group - sid := sqlliveness.SessionID(t.Name()) + sid, err := slstorage.MakeSessionID(enum.One, uuid.MakeV4()) + require.NoError(t, err) g.Go(func() (err error) { alive, err = cached.IsAlive(ctx, sid) return err @@ -650,7 +677,16 @@ func TestConcurrentAccessSynchronization(t *testing.T) { func TestDeleteMidUpdateFails(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - + t.Run("RegionalByRow", func(t *testing.T) { + defer envutil.TestSetEnv(t, "COCKROACH_MR_SYSTEM_DATABASE", "1")() + testDeleteMidUpdateFails(t) + }) + t.Run("RegionalByTable", func(t *testing.T) { + defer envutil.TestSetEnv(t, "COCKROACH_MR_SYSTEM_DATABASE", "0")() + testDeleteMidUpdateFails(t) + }) +} +func testDeleteMidUpdateFails(t *testing.T) { ctx := context.Background() type filterFunc = func(context.Context, *roachpb.BatchRequest, *roachpb.BatchResponse) *roachpb.Error var respFilter atomic.Value @@ -674,22 +710,17 @@ func TestDeleteMidUpdateFails(t *testing.T) { // Set up a fake storage implementation using a separate table. dbName := t.Name() - tdb.Exec(t, `CREATE DATABASE "`+dbName+`"`) - schema := strings.Replace(systemschema.SqllivenessTableSchema, - `CREATE TABLE system.sqlliveness`, - `CREATE TABLE "`+dbName+`".sqlliveness`, 1) - tdb.Exec(t, schema) - tableID := getTableID(t, tdb, dbName, "sqlliveness") + tableID := newSqllivenessTable(t, tdb, dbName) storage := slstorage.NewTestingStorage( s.DB().AmbientContext, s.Stopper(), s.Clock(), kvDB, keys.SystemSQLCodec, s.ClusterSettings(), - tableID, timeutil.DefaultTimeSource{}.NewTimer, + tableID, rbrIndexID, timeutil.DefaultTimeSource{}.NewTimer, ) // Insert a session. - ID := sqlliveness.SessionID("foo") - require.NoError(t, storage.Insert(ctx, ID, s.Clock().Now())) + ID, err := slstorage.MakeSessionID(enum.One, uuid.MakeV4()) + require.NoError(t, err) // Install a filter which will send on this channel when we attempt // to perform an update after the get has evaluated. @@ -735,16 +766,32 @@ func TestDeleteMidUpdateFails(t *testing.T) { require.NoError(t, res.err) } -func getTableID( - t *testing.T, db *sqlutils.SQLRunner, dbName, tableName string, -) (tableID descpb.ID) { +// rbrIndexID is the index id used to access the regional by row index in +// tests. In production it will be index 2, but the freshly created test table +// will have index 1. +const rbrIndexID = 1 + +func newSqllivenessTable(t *testing.T, db *sqlutils.SQLRunner, dbName string) (tableID descpb.ID) { + var schema string + if systemschema.TestSupportMultiRegion() { + schema = systemschema.MrSqllivenessTableSchema + } else { + schema = systemschema.SqllivenessTableSchema + } t.Helper() + db.Exec(t, fmt.Sprintf(`CREATE DATABASE IF NOT EXISTS "%s"`, dbName)) + tableName := "sqlliveness" + schema = strings.Replace(schema, + fmt.Sprintf("CREATE TABLE system.%s", tableName), + fmt.Sprintf(`CREATE TABLE "%s".%s`, dbName, tableName), + 1) + db.Exec(t, schema) db.QueryRow(t, ` -select u.id - from system.namespace t - join system.namespace u - on t.id = u."parentID" - where t.name = $1 and u.name = $2`, + select u.id + from system.namespace t + join system.namespace u + on t.id = u."parentID" + where t.name = $1 and u.name = $2`, dbName, tableName).Scan(&tableID) return tableID }