Skip to content

Commit

Permalink
<sql>: long running migration and mixed cluster version test for gran…
Browse files Browse the repository at this point in the history
…t option

Release note (<category, see below>): <what> <show> <why>
  • Loading branch information
jackcwu committed Nov 19, 2021
1 parent 1461db2 commit 1943aa7
Show file tree
Hide file tree
Showing 15 changed files with 675 additions and 7 deletions.
2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -166,4 +166,4 @@ trace.debug.enable boolean false if set, traces for recent requests can be seen
trace.jaeger.agent string the address of a Jaeger agent to receive traces using the Jaeger UDP Thrift protocol, as <host>:<port>. If no port is specified, 6381 will be used.
trace.opentelemetry.collector string address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.
trace.zipkin.collector string the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.
version version 21.2-10 set the active cluster version in the format '<major>.<minor>'
version version 21.2-12 set the active cluster version in the format '<major>.<minor>'
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,6 @@
<tr><td><code>trace.jaeger.agent</code></td><td>string</td><td><code></code></td><td>the address of a Jaeger agent to receive traces using the Jaeger UDP Thrift protocol, as <host>:<port>. If no port is specified, 6381 will be used.</td></tr>
<tr><td><code>trace.opentelemetry.collector</code></td><td>string</td><td><code></code></td><td>address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.</td></tr>
<tr><td><code>trace.zipkin.collector</code></td><td>string</td><td><code></code></td><td>the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>21.2-10</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>21.2-12</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
</tbody>
</table>
4 changes: 0 additions & 4 deletions pkg/cmd/roachtest/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -2299,8 +2299,6 @@ func (c *clusterImpl) Conn(ctx context.Context, node int) *gosql.DB {
// ConnE returns a SQL connection to the specified node.
func (c *clusterImpl) ConnE(ctx context.Context, node int) (*gosql.DB, error) {
urls, err := c.ExternalPGUrl(ctx, c.Node(node))
fmt.Println("THE URLS ARE:")
fmt.Println(urls)
if err != nil {
return nil, err
}
Expand All @@ -2317,9 +2315,7 @@ func (c *clusterImpl) ConnEUser(ctx context.Context, node int, user string) (*go
if err != nil {
return nil, err
}
//dataSourceName := fmt.Sprintf("%s?user=%s?password=%s", urls[0], user, password)
dataSourceName := strings.Replace(urls[0], "root", user, 1)
fmt.Println("datasourcename is: ", dataSourceName)
db, err := gosql.Open("postgres", dataSourceName)
if err != nil {
return nil, err
Expand Down
23 changes: 23 additions & 0 deletions pkg/cmd/roachtest/tests/privilege_version_upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ func runPrivilegeVersionUpgrade(
}

const currentVersion = ""
fmt.Println("<------------------------------------------------>")
fmt.Println("predecessor version is: ", predecessorVersion)
fmt.Println("build version is: ", buildVersion)
fmt.Println("mainVersion is: ", currentVersion)
fmt.Println("<------------------------------------------------>")
upgradeTest := func(
dbPrivs, schemaPrivs, tablePrivs, typePrivs privilege.List) *versionUpgradeTest {
steps := []versionStep{
Expand Down Expand Up @@ -74,12 +79,30 @@ func runPrivilegeVersionUpgrade(
steps = append(
steps,
// Roll nodes forward and finalize upgrade.
//printSQL("select version();", 1),
//printSQL("select crdb_internal.node_executable_version();", 1),
//printSQL("select version();", 2),
//printSQL("select crdb_internal.node_executable_version();", 2),
//printSQL("select version();", 3),
//printSQL("select crdb_internal.node_executable_version();", 3),
binaryUpgradeStep(c.Node(3), currentVersion),
binaryUpgradeStep(c.Node(1), currentVersion),
binaryUpgradeStep(c.Node(2), currentVersion),
//printSQL("select version();", 1),
//printSQL("select crdb_internal.node_executable_version();", 1),
//printSQL("select version();", 2),
//printSQL("select crdb_internal.node_executable_version();", 2),
//printSQL("select version();", 3),
//printSQL("select crdb_internal.node_executable_version();", 3),

allowAutoUpgradeStep(1),
waitForUpgradeStep(c.All()),
//printSQL("select version();", 1),
//printSQL("select crdb_internal.node_executable_version();", 1),
//printSQL("select version();", 2),
//printSQL("select crdb_internal.node_executable_version();", 2),
//printSQL("select version();", 3),
//printSQL("select crdb_internal.node_executable_version();", 3),

checkDatabasePrivilegesStep(dbPrivs),
checkSchemaPrivilegesStep(schemaPrivs),
Expand Down
147 changes: 147 additions & 0 deletions pkg/cmd/roachtest/tests/validate_grant_option.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package tests

import (
"context"

"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
"github.com/cockroachdb/cockroach/pkg/util/version"
"github.com/stretchr/testify/require"
)

func registerValidateGrantOption(r registry.Registry) {
r.Add(registry.TestSpec{
Name: "sql-experience/validate-grant-option",
Owner: registry.OwnerSQLExperience,
Cluster: r.MakeClusterSpec(3),
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
runRegisterValidateGrantOption(ctx, t, c, *t.BuildVersion())
},
})
}

// execSQL executes the SQL statement as "root".
func execSQL(sqlStatement string, errorExpected bool, node int) versionStep {
return func(ctx context.Context, t test.Test, u *versionUpgradeTest) {
conn, err := u.c.ConnE(ctx, node)
require.NoError(t, err)
_, err = conn.Exec(sqlStatement)
if !errorExpected {
require.NoError(t, err)
} else {
require.Error(t, err)
}
}
}

// execSQLUser changes the current user from "root" to the one that is input and executes the
// SQL statement as that user.
func execSQLUser(sqlStatement string, user string, errorExpected bool, node int) versionStep {
return func(ctx context.Context, t test.Test, u *versionUpgradeTest) {
conn, err := u.c.ConnEUser(ctx, node, user)
require.NoError(t, err)
_, err = conn.Exec(sqlStatement)
if !errorExpected {
require.NoError(t, err)
} else {
require.Error(t, err)
}
}
}

// printSQL is used for debugging by outputting the result of a query to the
// console.
//func printSQL(sqlStatement string, node int, user string) versionStep {
// return func(ctx context.Context, t test.Test, u *versionUpgradeTest) {
// conn, err := u.c.ConnEUser(ctx, node, user)
// require.NoError(t, err)
// row := conn.QueryRow(sqlStatement)
// var version string
// row.Scan(&version)
// require.NoError(t, err)
// }
//}

func runRegisterValidateGrantOption(
ctx context.Context, t test.Test, c cluster.Cluster, buildVersion version.Version,
) {
const mainVersion = ""
predecessorVersion, err := PredecessorVersion(buildVersion)
if err != nil {
t.Fatal(err)
}

u := newVersionUpgradeTest(c,
uploadAndStart(c.All(), predecessorVersion),
waitForUpgradeStep(c.All()),
preventAutoUpgradeStep(1),
preventAutoUpgradeStep(2),
preventAutoUpgradeStep(3),

execSQL("CREATE USER foo;", false, 1),
execSQL("CREATE USER foo2;", false, 2),
execSQL("CREATE USER foo3;", false, 2),
execSQL("CREATE USER foo4;", false, 2),
execSQL("CREATE USER target;", false, 1),

execSQL("CREATE DATABASE d;", false, 2),
execSQL("CREATE SCHEMA s;", false, 3),
execSQL("CREATE TABLE t1();", false, 2),
execSQL("CREATE TYPE ty AS ENUM();", false, 3),

execSQL("GRANT ALL PRIVILEGES ON DATABASE d TO foo;", false, 1),
execSQL("GRANT ALL PRIVILEGES ON SCHEMA s TO foo2;", false, 3),
execSQL("GRANT ALL PRIVILEGES ON TABLE t1 TO foo3;", false, 1),
execSQL("GRANT ALL PRIVILEGES ON TYPE ty TO foo4;", false, 1),

execSQLUser("SELECT * FROM t1;", "foo3", false, 2),
execSQLUser("GRANT CREATE ON DATABASE d TO target;", "foo", false, 1),
execSQLUser("GRANT USAGE ON SCHEMA s TO target;", "foo2", false, 3),
execSQLUser("GRANT SELECT ON TABLE t1 TO target;", "foo3", false, 1),
execSQLUser("GRANT USAGE ON TYPE ty TO target;", "foo4", false, 1),

binaryUpgradeStep(c.Node(1), mainVersion),
allowAutoUpgradeStep(1),
execSQLUser("GRANT CREATE ON DATABASE d TO target;", "foo", false, 1),
execSQLUser("GRANT USAGE ON SCHEMA s TO target;", "foo2", false, 3),
execSQLUser("GRANT INSERT ON TABLE t1 TO target;", "foo3", false, 2),
execSQLUser("GRANT GRANT ON TYPE ty TO target;", "foo4", false, 2),

binaryUpgradeStep(c.Node(2), mainVersion),
allowAutoUpgradeStep(2),
execSQLUser("GRANT ALL PRIVILEGES ON DATABASE d TO target;", "foo", false, 3),
execSQLUser("GRANT ALL PRIVILEGES ON SCHEMA s TO target;", "foo2", false, 2),
execSQLUser("GRANT ALL PRIVILEGES ON TABLE t1 TO target;", "foo3", false, 1),
execSQLUser("GRANT ALL PRIVILEGES ON TYPE ty TO target;", "foo4", false, 1),
execSQLUser("GRANT GRANT ON DATABASE d TO foo2;", "foo", false, 1),
execSQLUser("GRANT GRANT ON SCHEMA s TO foo3;", "foo2", false, 1),
execSQLUser("GRANT GRANT, SELECT ON TABLE t1 TO foo4;", "foo3", false, 2),

binaryUpgradeStep(c.Node(3), mainVersion),
allowAutoUpgradeStep(3),
waitForUpgradeStep(c.All()), // ALL NODES ARE UPGRADED, THE MIGRATION HAS FINISHED
execSQLUser("GRANT DELETE ON TABLE t1 TO foo2;", "foo3", false, 3),
execSQLUser("GRANT ALL PRIVILEGES ON SCHEMA s TO foo3;", "target", false, 2),
execSQLUser("GRANT CREATE ON DATABASE d TO foo3;", "foo2", true, 2),
execSQLUser("GRANT USAGE ON SCHEMA s TO foo;", "foo3", true, 2),
execSQLUser("GRANT INSERT ON TABLE t1 TO foo;", "foo4", true, 1),
execSQLUser("GRANT SELECT ON TABLE t1 TO foo;", "foo4", false, 1),

execSQL("CREATE USER foo5;", false, 2),
execSQL("CREATE TABLE t2();", false, 3),
execSQLUser("GRANT ALL PRIVILEGES ON TABLE t2 TO foo5;", "root", false, 1),
execSQLUser("GRANT DELETE ON TABLE t2 TO foo2;", "foo5", true, 2),
)
u.run(ctx, t)
}
142 changes: 142 additions & 0 deletions pkg/migration/migrations/grant_option_migration.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package migrations

import (
"context"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/migration"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkv"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/errors"
)

// grantOptionMigration iterates through every descriptor and sets a user's grant option bits
// equal to its privilege bits if it holds the "GRANT" privilege
func grantOptionMigration(
ctx context.Context, _ clusterversion.ClusterVersion, d migration.TenantDeps, _ *jobs.Job,
) error {
query := `SELECT id, descriptor, crdb_internal_mvcc_timestamp FROM system.descriptor ORDER BY ID ASC`
rows, err := d.InternalExecutor.QueryIterator(
ctx, "update-grant-options", nil /* txn */, query,
)
if err != nil {
return err
}

addGrantOptionFunc := func(ids []descpb.ID, descs []descpb.Descriptor, timestamps []hlc.Timestamp) error {
var modifiedDescs []catalog.MutableDescriptor
for i, id := range ids {
b := catalogkv.NewBuilderWithMVCCTimestamp(&descs[i], timestamps[i])
if b == nil {
return errors.Newf("unable to find descriptor for id %d", id)
}

err := b.RunMigrationOnlyChanges(ctx, nil /* DescGetter */) // RunPostDeserializationChanges previously
if err != nil {
return err
}
mutableDesc := b.BuildExistingMutable()

//if mutableDesc.HasPostDeserializationChanges() {
modifiedDescs = append(modifiedDescs, mutableDesc)
//}

}
if err := writeModifiedDescriptors(ctx, d, modifiedDescs); err != nil {
return err
}
return nil
}

return addGrantOptionMigration(ctx, rows, addGrantOptionFunc, 1<<19)
}

// addGrantOptionFunction is used in addGrantOptionMigration to maybe add grant options
// of descriptors specified by the id.
type addGrantOptionFunction func(ids []descpb.ID, descs []descpb.Descriptor, timestamps []hlc.Timestamp) error

// addGrantOptionMigration is an abstraction for adding grant options.
// The rows provided should be the result of a select ID, descriptor, crdb_internal_mvcc_timestamp
// from system.descriptor table.
// The datums returned from the query are parsed to grab the descpb.Descriptor
// and addGrantOptionFunction is called on the desc.
// If minBatchSizeInBytes is specified, fixDescriptors will only be called once the
// size of the descriptors in the id array surpasses minBatchSizeInBytes.
func addGrantOptionMigration(
ctx context.Context,
rows sqlutil.InternalRows,
grantOptionFunc addGrantOptionFunction,
minBatchSizeInBytes int,
) error {
defer func() { _ = rows.Close() }()
ok, err := rows.Next(ctx)
if err != nil {
return err
}
currSize := 0 // in bytes.
var ids []descpb.ID
var descs []descpb.Descriptor
var timestamps []hlc.Timestamp
for ; ok; ok, err = rows.Next(ctx) {
if err != nil {
return err
}
datums := rows.Cur()
id, desc, ts, err := unmarshalDescFromDescriptorRow(datums)
if err != nil {
return err
}
ids = append(ids, id)
descs = append(descs, desc)
timestamps = append(timestamps, ts)
currSize += desc.Size()
if currSize > minBatchSizeInBytes || minBatchSizeInBytes == 0 {
err = grantOptionFunc(ids, descs, timestamps)
if err != nil {
return err
}
// Reset size and id array after the batch is fixed.
currSize = 0
ids = nil
descs = nil
timestamps = nil
}
}
// Fix remaining descriptors.
return grantOptionFunc(ids, descs, timestamps)
}

// writeModifiedDescriptors writes the descriptors that we have given grant option privileges
// to back to batch
func writeModifiedDescriptors(
ctx context.Context, d migration.TenantDeps, modifiedDescs []catalog.MutableDescriptor,
) error {
return d.CollectionFactory.Txn(ctx, d.InternalExecutor, d.DB, func(
ctx context.Context, txn *kv.Txn, descriptors *descs.Collection,
) error {
batch := txn.NewBatch()
for _, desc := range modifiedDescs {
err := catalogkv.WriteDescToBatch(ctx, false, d.Settings, batch, d.Codec, desc.GetID(), desc)
if err != nil {
return err
}
}
return txn.Run(ctx, batch)
})
}
Loading

0 comments on commit 1943aa7

Please sign in to comment.