From 5265347498c2ebafc2a5e2e7832af28ceeb6678e Mon Sep 17 00:00:00 2001 From: Yevgeniy Miretskiy Date: Fri, 13 Jan 2023 09:02:58 -0500 Subject: [PATCH 1/4] changefeedccl: Rename and remove CDC functions This PR renames and removes CDC specific functions, while maintaining backward compatability. * `cdc_is_delete()` function removed. It is replaced with `event_op()` function which returns a string describing the type of event. If the changefeed is running with `diff` option, then this function returns `insert`, `update`, or `delete`. If changefeed is running without the `diff` option, we can't tell an update from insert, so this function returns `upsert` or `delete`. * `cdc_mvcc_timestamp()` function removed. This information can be accessed via cockroach standard system column `crdb_internal_mvcc_timestamp`. The same timestamp column is available in the previous row state `(cdc_prev).crdb_internal_mvcc_timestamp` * `cdc_updated_timestamp()` function renamed as `event_schema_timestamp()` Fixes #92482 Epic: CRDB-17161 Release note (enterprise change): Deprecate and replace some of the changefeed specific functions made available in preview mode in 22.2 release. While these functions are deprecated, old changefeed transformations should continue to function properly. Customers are encouraged to closely monitor their changefeeds created during upgrade. While effort was made to maintain backward compatability, the updated changefeed transformation may produce slightly different output (different column names, etc). --- pkg/ccl/changefeedccl/cdceval/compat.go | 45 +++-- pkg/ccl/changefeedccl/cdceval/compat_test.go | 12 ++ pkg/ccl/changefeedccl/cdceval/expr_eval.go | 38 +++- .../changefeedccl/cdceval/expr_eval_test.go | 22 +-- .../cdceval/func_resolver_test.go | 4 +- pkg/ccl/changefeedccl/cdceval/functions.go | 41 ++--- .../changefeedccl/cdceval/functions_test.go | 168 ++++++++++++------ .../changefeedccl/cdceval/validation_test.go | 2 +- pkg/ccl/changefeedccl/changefeed_test.go | 2 +- pkg/ccl/changefeedccl/event_processing.go | 9 +- 10 files changed, 229 insertions(+), 114 deletions(-) diff --git a/pkg/ccl/changefeedccl/cdceval/compat.go b/pkg/ccl/changefeedccl/cdceval/compat.go index ae38c8c7e312..6f5dbc9102be 100644 --- a/pkg/ccl/changefeedccl/cdceval/compat.go +++ b/pkg/ccl/changefeedccl/cdceval/compat.go @@ -9,6 +9,8 @@ package cdceval import ( + "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" + "github.com/cockroachdb/cockroach/pkg/sql/parser" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/errors" ) @@ -23,18 +25,39 @@ func RewritePreviewExpression(oldExpr *tree.SelectClause) (*tree.SelectClause, e default: return true, expr, nil case *tree.FuncExpr: - // Old style cdc_prev() function returned JSONb object. - // Replace this call with a call to row_to_json((cdc_prev).*) - if n, ok := t.Func.FunctionReference.(*tree.UnresolvedName); ok && n.Parts[0] == "cdc_prev" { - rowToJSON := &tree.FuncExpr{ - Func: tree.ResolvableFunctionReference{ - FunctionReference: tree.FunDefs["row_to_json"], - }, - Exprs: tree.Exprs{ - &tree.TupleStar{Expr: tree.NewUnresolvedName("cdc_prev")}, - }, + if n, ok := t.Func.FunctionReference.(*tree.UnresolvedName); ok { + switch n.Parts[0] { + case "cdc_prev": + // Old style cdc_prev() function returned JSONb object. + // Replace this call with a call to row_to_json((cdc_prev).*) + rowToJSON := &tree.FuncExpr{ + Func: tree.ResolvableFunctionReference{ + FunctionReference: tree.FunDefs["row_to_json"], + }, + Exprs: tree.Exprs{ + &tree.TupleStar{Expr: tree.NewUnresolvedName("cdc_prev")}, + }, + } + return true, rowToJSON, nil + case "cdc_is_delete": + // Old style cdc_is_delete returned boolean; use event_op instead. + newExpr, err := parser.ParseExpr("(event_op() = 'delete')") + if err != nil { + return false, expr, err + } + return true, newExpr, nil + case "cdc_mvcc_timestamp": + // Old cdc_mvcc_timestamp() function gets replaced with crdb_intenral_mvcc_timestamp column. + return true, tree.NewUnresolvedName(colinfo.MVCCTimestampColumnName), nil + case "cdc_updated_timestamp": + // Old cdc_updated_timestamp gets replaced with event_schema_timestamp. + newExpr := &tree.FuncExpr{ + Func: tree.ResolvableFunctionReference{ + FunctionReference: tree.NewUnresolvedName("changefeed_event_schema_timestamp"), + }, + } + return true, newExpr, nil } - return true, rowToJSON, nil } } return true, expr, nil diff --git a/pkg/ccl/changefeedccl/cdceval/compat_test.go b/pkg/ccl/changefeedccl/cdceval/compat_test.go index 8d6c5c7c4a24..1b0371bb29d2 100644 --- a/pkg/ccl/changefeedccl/cdceval/compat_test.go +++ b/pkg/ccl/changefeedccl/cdceval/compat_test.go @@ -64,6 +64,18 @@ func TestCompatRewrite(t *testing.T) { oldExpr: "SELECT foo.*, cdc_prev() FROM " + fooRef + " WHERE cdc_prev()->>'field' = 'blah'", newExpr: "SELECT foo.*, row_to_json((cdc_prev).*) FROM " + fooRef + " WHERE (row_to_json((cdc_prev).*)->>'field') = 'blah'", }, + { + oldExpr: "SELECT foo.*, cdc_is_delete() FROM " + fooRef, + newExpr: "SELECT foo.*, (event_op() = 'delete') FROM " + fooRef, + }, + { + oldExpr: "SELECT foo.*, cdc_is_delete() AS was_deleted FROM " + fooRef, + newExpr: "SELECT foo.*, (event_op() = 'delete') AS was_deleted FROM " + fooRef, + }, + { + oldExpr: "SELECT foo.*, cdc_mvcc_timestamp() FROM " + fooRef, + newExpr: "SELECT foo.*, crdb_internal_mvcc_timestamp FROM " + fooRef, + }, } { sc, err := ParseChangefeedExpression(tc.oldExpr) require.NoError(t, err) diff --git a/pkg/ccl/changefeedccl/cdceval/expr_eval.go b/pkg/ccl/changefeedccl/cdceval/expr_eval.go index 2d72a8e77cc6..9027e2f0ba0a 100644 --- a/pkg/ccl/changefeedccl/cdceval/expr_eval.go +++ b/pkg/ccl/changefeedccl/cdceval/expr_eval.go @@ -41,8 +41,8 @@ type Evaluator struct { execCfg *sql.ExecutorConfig user username.SQLUsername sessionData sessiondatapb.SessionData - - familyEval map[descpb.FamilyID]*familyEvaluator + withDiff bool + familyEval map[descpb.FamilyID]*familyEvaluator } // familyEvaluator is a responsible for evaluating expressions in CDC @@ -82,6 +82,7 @@ func NewEvaluator( user username.SQLUsername, sd sessiondatapb.SessionData, statementTS hlc.Timestamp, + withDiff bool, ) *Evaluator { return &Evaluator{ sc: sc, @@ -89,6 +90,7 @@ func NewEvaluator( user: user, sessionData: sd, statementTS: statementTS, + withDiff: withDiff, familyEval: make(map[descpb.FamilyID]*familyEvaluator, 1), // usually, just 1 family. } } @@ -101,6 +103,7 @@ func newFamilyEvaluator( user username.SQLUsername, sd sessiondatapb.SessionData, statementTS hlc.Timestamp, + withDiff bool, ) *familyEvaluator { e := familyEvaluator{ targetFamilyID: targetFamilyID, @@ -113,6 +116,8 @@ func newFamilyEvaluator( rowCh: make(chan tree.Datums, 1), } e.rowEvalCtx.startTime = statementTS + e.rowEvalCtx.withDiff = withDiff + // Arrange to be notified when event does not match predicate. predicateAsProjection(e.norm) @@ -143,7 +148,9 @@ func (e *Evaluator) Eval( fe, ok := e.familyEval[updatedRow.FamilyID] if !ok { - fe = newFamilyEvaluator(e.sc, updatedRow.FamilyID, e.execCfg, e.user, e.sessionData, e.statementTS) + fe = newFamilyEvaluator( + e.sc, updatedRow.FamilyID, e.execCfg, e.user, e.sessionData, e.statementTS, e.withDiff, + ) e.familyEval[updatedRow.FamilyID] = fe } @@ -183,7 +190,7 @@ func (e *familyEvaluator) eval( } // Setup context. - if err := e.setupContextForRow(ctx, updatedRow); err != nil { + if err := e.setupContextForRow(ctx, updatedRow, prevRow); err != nil { return cdcevent.Row{}, err } @@ -447,9 +454,28 @@ func (e *familyEvaluator) copyPrevRow(prev cdcevent.Row) error { // setupContextForRow configures evaluation context with the provided row // information. -func (e *familyEvaluator) setupContextForRow(ctx context.Context, updated cdcevent.Row) error { +func (e *familyEvaluator) setupContextForRow( + ctx context.Context, updated cdcevent.Row, prevRow cdcevent.Row, +) error { e.rowEvalCtx.ctx = ctx e.rowEvalCtx.updatedRow = updated + + if updated.IsDeleted() { + e.rowEvalCtx.op = eventTypeDelete + } else { + // Insert or update. + if e.rowEvalCtx.withDiff { + if prevRow.IsInitialized() { + e.rowEvalCtx.op = eventTypeUpdate + } else { + e.rowEvalCtx.op = eventTypeInsert + } + } else { + // Without diff option we can't tell insert from update; so, use upsert. + e.rowEvalCtx.op = eventTypeUpsert + } + } + return nil } @@ -478,7 +504,9 @@ func (e *familyEvaluator) closeErr() error { type rowEvalContext struct { ctx context.Context startTime hlc.Timestamp + withDiff bool updatedRow cdcevent.Row + op tree.Datum } // cdcAnnotationAddr is the address used to store relevant information diff --git a/pkg/ccl/changefeedccl/cdceval/expr_eval_test.go b/pkg/ccl/changefeedccl/cdceval/expr_eval_test.go index c5b721d159d4..3d4d23d72597 100644 --- a/pkg/ccl/changefeedccl/cdceval/expr_eval_test.go +++ b/pkg/ccl/changefeedccl/cdceval/expr_eval_test.go @@ -11,6 +11,7 @@ package cdceval import ( "context" "fmt" + "math/rand" "sort" "strconv" "testing" @@ -34,7 +35,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/randutil" "github.com/stretchr/testify/require" ) @@ -145,15 +145,15 @@ $$`) "INSERT INTO foo (a, b) VALUES (2, '2nd test')", "DELETE FROM foo WHERE a=2 AND b='2nd test'", }, - stmt: "SELECT *, cdc_is_delete() FROM foo WHERE 'hello' != 'world'", + stmt: "SELECT *, event_op() = 'delete' AS deleted FROM foo WHERE 'hello' != 'world'", expectMainFamily: []decodeExpectation{ { keyValues: []string{"2nd test", "2"}, - allValues: map[string]string{"a": "2", "b": "2nd test", "e": "inactive", "cdc_is_delete": "false"}, + allValues: map[string]string{"a": "2", "b": "2nd test", "e": "inactive", "deleted": "false"}, }, { keyValues: []string{"2nd test", "2"}, - allValues: map[string]string{"a": "2", "b": "2nd test", "e": "NULL", "cdc_is_delete": "true"}, + allValues: map[string]string{"a": "2", "b": "2nd test", "e": "NULL", "deleted": "true"}, }, }, }, @@ -356,7 +356,7 @@ $$`) "INSERT INTO foo (a, b) VALUES (123, 'select_if')", "DELETE FROM foo where a=123", }, - stmt: "SELECT IF(cdc_is_delete(),'deleted',a::string) AS conditional FROM foo", + stmt: "SELECT IF(event_op() = 'delete','deleted',a::string) AS conditional FROM foo", expectMainFamily: []decodeExpectation{ { keyValues: []string{"select_if", "123"}, @@ -388,7 +388,7 @@ $$`) actions: []string{ "INSERT INTO foo (a, b, h) VALUES (1, 'hello', 'invisible')", }, - stmt: "SELECT a, tableoid, h FROM foo WHERE crdb_internal_mvcc_timestamp = cdc_mvcc_timestamp()", + stmt: "SELECT a, tableoid, h FROM foo WHERE crdb_internal_mvcc_timestamp > 0", expectMainFamily: []decodeExpectation{ { keyValues: []string{"hello", "1"}, @@ -404,7 +404,7 @@ $$`) actions: []string{ "INSERT INTO foo (a, b, h) VALUES (1, 'hello', 'invisible')", }, - stmt: "SELECT a, tableoid, h FROM foo WHERE crdb_internal_mvcc_timestamp = cdc_mvcc_timestamp()", + stmt: "SELECT a, tableoid, h FROM foo WHERE crdb_internal_mvcc_timestamp > 0", expectErr: `column "h" does not exist`, }, { @@ -496,7 +496,7 @@ $$`) "SELECT x, 'only_some_deleted_values', x::string FROM s", }, actions: []string{"DELETE FROM foo WHERE b='only_some_deleted_values'"}, - stmt: `SELECT * FROM foo WHERE cdc_is_delete() AND (cdc_prev).a % 33 = 0`, + stmt: `SELECT * FROM foo WHERE event_op() = 'delete' AND (cdc_prev).a % 33 = 0`, expectMainFamily: repeatExpectation(decodeExpectation{expectUnwatchedErr: true}, 100), expectOnlyCFamily: func() (expectations []decodeExpectation) { for i := 1; i <= 100; i++ { @@ -722,10 +722,9 @@ func slurpValues(t *testing.T, r cdcevent.Row) map[string]string { } func randEncDatumPrimaryFamily( - t *testing.T, desc catalog.TableDescriptor, + t *testing.T, rng *rand.Rand, desc catalog.TableDescriptor, ) (row rowenc.EncDatumRow) { t.Helper() - rng, _ := randutil.NewTestRand() family, err := catalog.MustFindFamilyByID(desc, 0 /* id */) require.NoError(t, err) @@ -781,8 +780,9 @@ func newEvaluatorWithNormCheck( return nil, err } + const withDiff = true return NewEvaluator(norm.SelectClause, execCfg, username.RootUserName(), - defaultDBSessionData, hlc.Timestamp{}), nil + defaultDBSessionData, hlc.Timestamp{}, withDiff), nil } var defaultDBSessionData = sessiondatapb.SessionData{ diff --git a/pkg/ccl/changefeedccl/cdceval/func_resolver_test.go b/pkg/ccl/changefeedccl/cdceval/func_resolver_test.go index c5cb17ebde26..060b04ee8004 100644 --- a/pkg/ccl/changefeedccl/cdceval/func_resolver_test.go +++ b/pkg/ccl/changefeedccl/cdceval/func_resolver_test.go @@ -60,12 +60,12 @@ $$`) }, { testName: "cdc name without schema", - fnName: tree.MakeUnresolvedName("cdc_mvcc_timestamp"), + fnName: tree.MakeUnresolvedName("changefeed_creation_timestamp"), expectedSchema: "public", }, { testName: "uppercase cdc name without schema", - fnName: tree.MakeUnresolvedName("cdc_mVCC_timeStamp"), + fnName: tree.MakeUnresolvedName("changefeed_creATIon_TimeStamp"), expectedSchema: "public", }, { diff --git a/pkg/ccl/changefeedccl/cdceval/functions.go b/pkg/ccl/changefeedccl/cdceval/functions.go index 58c4c5fcd88e..ebd5cccbaadc 100644 --- a/pkg/ccl/changefeedccl/cdceval/functions.go +++ b/pkg/ccl/changefeedccl/cdceval/functions.go @@ -90,38 +90,26 @@ var cdcFunctions = map[string]*tree.ResolvedFunctionDefinition{ //"st_asgeojson", //"st_estimatedextent", - // NB: even though some cdc functions appear to be stable (e.g. cdc_is_delete()), + // NB: even though some cdc functions appear to be stable (e.g. event_op()), // we should not mark custom CDC functions as stable. Doing so will cause // optimizer to (constant) fold this function during optimization step -- something // we definitely don't want to do because we need to evaluate those functions // for each event. - "cdc_is_delete": makeCDCBuiltIn( - "cdc_is_delete", + "event_op": makeCDCBuiltIn( + "event_op", tree.Overload{ Types: tree.ParamTypes{}, - ReturnType: tree.FixedReturnType(types.Bool), + ReturnType: tree.FixedReturnType(types.String), Fn: func(ctx context.Context, evalCtx *eval.Context, datums tree.Datums) (tree.Datum, error) { rowEvalCtx := rowEvalContextFromEvalContext(evalCtx) - if rowEvalCtx.updatedRow.IsDeleted() { - return tree.DBoolTrue, nil - } - return tree.DBoolFalse, nil + return rowEvalCtx.op, nil }, - Info: "Returns true if the event is a deletion", + Info: "Returns 'insert', 'update', 'upsert' or 'delete' to describe the type of the operation.", Volatility: volatility.Volatile, }), - "cdc_mvcc_timestamp": cdcTimestampBuiltin( - "cdc_mvcc_timestamp", - "Returns MVCC timestamp of the event", - volatility.Volatile, - types.Decimal, - func(rowEvalCtx *rowEvalContext) hlc.Timestamp { - return rowEvalCtx.updatedRow.MvccTimestamp - }, - ), - "cdc_updated_timestamp": cdcTimestampBuiltin( - "cdc_updated_timestamp", - "Returns schema timestamp of the event", + "event_schema_timestamp": cdcTimestampBuiltin( + "event_schema_timestamp", + "Returns schema timestamp of the event.", volatility.Volatile, types.Decimal, func(rowEvalCtx *rowEvalContext) hlc.Timestamp { @@ -130,7 +118,7 @@ var cdcFunctions = map[string]*tree.ResolvedFunctionDefinition{ ), "changefeed_creation_timestamp": cdcTimestampBuiltin( "changefeed_creation_timestamp", - "Returns changefeed creation time", + "Returns changefeed creation time.", volatility.Stable, types.Decimal, func(rowEvalCtx *rowEvalContext) hlc.Timestamp { @@ -139,6 +127,13 @@ var cdcFunctions = map[string]*tree.ResolvedFunctionDefinition{ ), } +var ( + eventTypeInsert = tree.NewDString("insert") + eventTypeUpdate = tree.NewDString("update") + eventTypeUpsert = tree.NewDString("upsert") + eventTypeDelete = tree.NewDString("delete") +) + const cdcFnCategory = "CDC builtin" var cdcFnProps = &tree.FunctionProperties{ @@ -253,7 +248,7 @@ func init() { case volatility.Stable: // If the stable function is not on the white list, // then it is blacklisted. - if _, whitelisted := cdcFunctions[fnDef.Name]; !whitelisted { + if _, allowed := cdcFunctions[fnDef.Name]; !allowed { functionDenyList[fnDef.Name] = struct{}{} } } diff --git a/pkg/ccl/changefeedccl/cdceval/functions_test.go b/pkg/ccl/changefeedccl/cdceval/functions_test.go index 0abb6b23a70f..840c091a5199 100644 --- a/pkg/ccl/changefeedccl/cdceval/functions_test.go +++ b/pkg/ccl/changefeedccl/cdceval/functions_test.go @@ -20,6 +20,9 @@ import ( "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" + "github.com/cockroachdb/cockroach/pkg/sql/randgen" + "github.com/cockroachdb/cockroach/pkg/sql/rowenc" "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb" @@ -74,8 +77,7 @@ func TestEvaluatesCDCFunctionOverloads(t *testing.T) { for fn, preferredOverload := range map[string]preferredFn{ "statement_timestamp": expectTSTZ, "transaction_timestamp": expectTSTZ, - "cdc_mvcc_timestamp": expectHLC, - "cdc_updated_timestamp": expectHLC, + "event_schema_timestamp": expectHLC, "changefeed_creation_timestamp": expectHLC, } { t.Run(fn, func(t *testing.T) { @@ -85,22 +87,21 @@ func TestEvaluatesCDCFunctionOverloads(t *testing.T) { targetTS := rowTS switch fn { - case "cdc_updated_timestamp": + case "event_schema_timestamp": targetTS = schemaTS case "changefeed_creation_timestamp": targetTS = createTS } // We'll run tests against some future time stamp to ensure // that time functions use correct values. - testRow := makeEventRow(t, desc, schemaTS, false, rowTS) - e, err := newEvaluator(&execCfg, &semaCtx, testRow.EventDescriptor, - fmt.Sprintf("SELECT "+ - "%[1]s() AS preferred,"+ // Preferred overload. - "%[1]s():::TIMESTAMPTZ AS tstz,"+ // Force timestamptz overload. - "%[1]s():::TIMESTAMP AS ts,"+ // Force timestamp overload. - "%[1]s():::DECIMAL AS dec,"+ // Force decimal overload. - "%[1]s()::STRING AS str"+ // Casts preferred overload to string. - " FROM foo", fn)) + testRow := makeEventRow(t, desc, schemaTS, false, rowTS, false) + e, err := newEvaluator(&execCfg, &semaCtx, testRow.EventDescriptor, false, fmt.Sprintf("SELECT "+ + "%[1]s() AS preferred,"+ // Preferred overload. + "%[1]s():::TIMESTAMPTZ AS tstz,"+ // Force timestamptz overload. + "%[1]s():::TIMESTAMP AS ts,"+ // Force timestamp overload. + "%[1]s():::DECIMAL AS dec,"+ // Force decimal overload. + "%[1]s()::STRING AS str"+ // Casts preferred overload to string. + " FROM foo", fn)) require.NoError(t, err) defer e.Close() e.statementTS = createTS @@ -132,7 +133,7 @@ func TestEvaluatesCDCFunctionOverloads(t *testing.T) { // should have no bearing on the returned values -- we should see // the same thing we saw before. updatedExpectations = initialExpectations - case "cdc_updated_timestamp": + case "event_schema_timestamp": targetTS = testRow.SchemaTS fallthrough default: @@ -158,10 +159,9 @@ func TestEvaluatesCDCFunctionOverloads(t *testing.T) { // timezone. Since we don't do any special setup with session data, the // default timezone is UTC. We'll use a "strange" timezone of -1h33m from // UTC to test conversion. - testRow := makeEventRow(t, desc, s.Clock().Now(), false, futureTS) - e, err := newEvaluator(&execCfg, &semaCtx, testRow.EventDescriptor, - fmt.Sprintf("SELECT timezone('+01:33:00', '%s'::time) FROM foo", - futureTS.GoTime().Format("15:04:05"))) + testRow := makeEventRow(t, desc, s.Clock().Now(), false, futureTS, false) + e, err := newEvaluator(&execCfg, &semaCtx, testRow.EventDescriptor, false, fmt.Sprintf("SELECT timezone('+01:33:00', '%s'::time) FROM foo", + futureTS.GoTime().Format("15:04:05"))) require.NoError(t, err) defer e.Close() @@ -174,21 +174,73 @@ func TestEvaluatesCDCFunctionOverloads(t *testing.T) { }) }) - t.Run("cdc_is_delete", func(t *testing.T) { + t.Run("event_op", func(t *testing.T) { schemaTS := s.Clock().Now() - testRow := makeEventRow(t, desc, schemaTS, false, s.Clock().Now()) - e, err := newEvaluator(&execCfg, &semaCtx, testRow.EventDescriptor, - "SELECT cdc_is_delete() FROM foo") - require.NoError(t, err) - defer e.Close() + row := makeEventRow(t, desc, schemaTS, false, s.Clock().Now(), true) + deletedRow := makeEventRow(t, desc, schemaTS, true, s.Clock().Now(), true) + nilRow := cdcevent.Row{} + + for _, tc := range []struct { + op string + row cdcevent.Row + prevRow cdcevent.Row + withDiff bool + expect string + }{ + { + op: "insert", + row: row, + prevRow: nilRow, + withDiff: true, + expect: "insert", + }, + { + op: "update", + row: row, + prevRow: row, + withDiff: true, + expect: "update", + }, + { + // Without diff, we can't tell an update from insert, so we emit upsert. + op: "insert", + row: row, + prevRow: nilRow, + withDiff: false, + expect: "upsert", + }, + { + // Without diff, we can't tell an update from insert, so we emit upsert. + op: "update", + row: row, + prevRow: nilRow, + withDiff: false, + expect: "upsert", + }, + { + op: "delete", + row: deletedRow, + prevRow: row, + withDiff: true, + expect: "delete", + }, + { + op: "delete", + row: deletedRow, + prevRow: row, + withDiff: false, + expect: "delete", + }, + } { + t.Run(fmt.Sprintf("%s/diff=%t", tc.op, tc.withDiff), func(t *testing.T) { + e, err := newEvaluator(&execCfg, &semaCtx, tc.row.EventDescriptor, tc.withDiff, "SELECT event_op() FROM foo") + require.NoError(t, err) + defer e.Close() - for _, expectDelete := range []bool{true, false} { - testRow := makeEventRow(t, desc, schemaTS, expectDelete, s.Clock().Now()) - p, err := e.Eval(ctx, testRow, cdcevent.Row{}) - require.NoError(t, err) - require.Equal(t, - map[string]string{"cdc_is_delete": fmt.Sprintf("%t", expectDelete)}, - slurpValues(t, p)) + p, err := e.Eval(ctx, tc.row, tc.prevRow) + require.NoError(t, err) + require.Equal(t, map[string]string{"event_op": tc.expect}, slurpValues(t, p)) + }) } }) @@ -200,10 +252,9 @@ func TestEvaluatesCDCFunctionOverloads(t *testing.T) { } t.Run("pg_collation_for", func(t *testing.T) { - testRow := makeEventRow(t, desc, s.Clock().Now(), false, s.Clock().Now()) + testRow := makeEventRow(t, desc, s.Clock().Now(), false, s.Clock().Now(), false) - e, err := newEvaluator(&execCfg, &semaCtx, testRow.EventDescriptor, - `SELECT pg_collation_for('hello' COLLATE de_DE) AS col FROM foo`) + e, err := newEvaluator(&execCfg, &semaCtx, testRow.EventDescriptor, false, `SELECT pg_collation_for('hello' COLLATE de_DE) AS col FROM foo`) require.NoError(t, err) defer e.Close() @@ -214,11 +265,10 @@ func TestEvaluatesCDCFunctionOverloads(t *testing.T) { for _, fn := range []string{"to_json", "to_jsonb"} { t.Run(fn, func(t *testing.T) { - testRow := makeEventRow(t, desc, s.Clock().Now(), false, s.Clock().Now()) + testRow := makeEventRow(t, desc, s.Clock().Now(), false, s.Clock().Now(), false) rowDatums := testRow.EncDatums() - e, err := newEvaluator(&execCfg, &semaCtx, testRow.EventDescriptor, - fmt.Sprintf("SELECT %s(a) FROM foo", fn)) + e, err := newEvaluator(&execCfg, &semaCtx, testRow.EventDescriptor, false, fmt.Sprintf("SELECT %s(a) FROM foo", fn)) require.NoError(t, err) defer e.Close() @@ -231,11 +281,10 @@ func TestEvaluatesCDCFunctionOverloads(t *testing.T) { } t.Run("row_to_json", func(t *testing.T) { - testRow := makeEventRow(t, desc, s.Clock().Now(), false, s.Clock().Now()) + testRow := makeEventRow(t, desc, s.Clock().Now(), false, s.Clock().Now(), false) rowDatums := testRow.EncDatums() - e, err := newEvaluator(&execCfg, &semaCtx, testRow.EventDescriptor, - "SELECT row_to_json(row(a, b, c)) FROM foo") + e, err := newEvaluator(&execCfg, &semaCtx, testRow.EventDescriptor, false, "SELECT row_to_json(row(a, b, c)) FROM foo") require.NoError(t, err) defer e.Close() @@ -251,10 +300,9 @@ func TestEvaluatesCDCFunctionOverloads(t *testing.T) { }) t.Run("jsonb_build_array", func(t *testing.T) { - testRow := makeEventRow(t, desc, s.Clock().Now(), false, s.Clock().Now()) + testRow := makeEventRow(t, desc, s.Clock().Now(), false, s.Clock().Now(), false) rowDatums := testRow.EncDatums() - e, err := newEvaluator(&execCfg, &semaCtx, testRow.EventDescriptor, - "SELECT jsonb_build_array(a, a, 42) AS three_ints FROM foo") + e, err := newEvaluator(&execCfg, &semaCtx, testRow.EventDescriptor, false, "SELECT jsonb_build_array(a, a, 42) AS three_ints FROM foo") require.NoError(t, err) defer e.Close() @@ -271,10 +319,9 @@ func TestEvaluatesCDCFunctionOverloads(t *testing.T) { }) t.Run("jsonb_build_object", func(t *testing.T) { - testRow := makeEventRow(t, desc, s.Clock().Now(), false, s.Clock().Now()) + testRow := makeEventRow(t, desc, s.Clock().Now(), false, s.Clock().Now(), false) rowDatums := testRow.EncDatums() - e, err := newEvaluator(&execCfg, &semaCtx, testRow.EventDescriptor, - "SELECT jsonb_build_object('a', a, 'b', b, 'c', c) AS obj FROM foo") + e, err := newEvaluator(&execCfg, &semaCtx, testRow.EventDescriptor, false, "SELECT jsonb_build_object('a', a, 'b', b, 'c', c) AS obj FROM foo") require.NoError(t, err) defer e.Close() @@ -293,9 +340,8 @@ func TestEvaluatesCDCFunctionOverloads(t *testing.T) { // These functions have overloads; call the one that's stable overload // (i.e. one that needs to convert types.Any to string). t.Run(fn, func(t *testing.T) { - testRow := makeEventRow(t, desc, s.Clock().Now(), false, s.Clock().Now()) - e, err := newEvaluator(&execCfg, &semaCtx, testRow.EventDescriptor, - fmt.Sprintf("SELECT %s(42) FROM foo", fn)) + testRow := makeEventRow(t, desc, s.Clock().Now(), false, s.Clock().Now(), false) + e, err := newEvaluator(&execCfg, &semaCtx, testRow.EventDescriptor, false, fmt.Sprintf("SELECT %s(42) FROM foo", fn)) require.NoError(t, err) defer e.Close() @@ -309,10 +355,9 @@ func TestEvaluatesCDCFunctionOverloads(t *testing.T) { // overlaps has many overloads; most of them are immutable, but 1 is stable. t.Run("overlaps", func(t *testing.T) { - testRow := makeEventRow(t, desc, s.Clock().Now(), false, s.Clock().Now()) + testRow := makeEventRow(t, desc, s.Clock().Now(), false, s.Clock().Now(), false) - e, err := newEvaluator(&execCfg, &semaCtx, testRow.EventDescriptor, - `SELECT overlaps(transaction_timestamp(), interval '0', transaction_timestamp(), interval '-1s') FROM foo`) + e, err := newEvaluator(&execCfg, &semaCtx, testRow.EventDescriptor, false, `SELECT overlaps(transaction_timestamp(), interval '0', transaction_timestamp(), interval '-1s') FROM foo`) require.NoError(t, err) defer e.Close() @@ -324,7 +369,7 @@ func TestEvaluatesCDCFunctionOverloads(t *testing.T) { // Test that cdc specific functions correctly resolve overload, and that an // error is returned when cdc function called with wrong arguments. t.Run("cdc function errors", func(t *testing.T) { - testRow := makeEventRow(t, desc, s.Clock().Now(), false, s.Clock().Now()) + testRow := makeEventRow(t, desc, s.Clock().Now(), false, s.Clock().Now(), false) // currently, all cdc functions take no args, so call these functions with // some arguments. rng, _ := randutil.NewTestRand() @@ -343,8 +388,7 @@ func TestEvaluatesCDCFunctionOverloads(t *testing.T) { // Run this test only for CDC specific functions. if def != useDefaultBuiltin && def.Overloads[0].FunctionProperties.Category == cdcFnCategory { t.Run(fn, func(t *testing.T) { - e, err := newEvaluator(&execCfg, &semaCtx, testRow.EventDescriptor, - fmt.Sprintf("SELECT %s(%s) FROM foo", fn, fnArgs())) + e, err := newEvaluator(&execCfg, &semaCtx, testRow.EventDescriptor, false, fmt.Sprintf("SELECT %s(%s) FROM foo", fn, fnArgs())) require.NoError(t, err) _, err = e.Eval(ctx, testRow, testRow) require.Regexp(t, "unknown signature", err) @@ -360,9 +404,15 @@ func makeEventRow( schemaTS hlc.Timestamp, deleted bool, mvccTS hlc.Timestamp, + includeMVCCCol bool, ) cdcevent.Row { t.Helper() - datums := randEncDatumPrimaryFamily(t, desc) + rng, _ := randutil.NewTestRand() + + datums := randEncDatumPrimaryFamily(t, rng, desc) + if includeMVCCCol { + datums = append(datums, rowenc.EncDatum{Datum: randgen.RandDatum(rng, colinfo.MVCCTimestampColumnType, false)}) + } r := cdcevent.TestingMakeEventRow(desc, 0, datums, deleted) r.SchemaTS = schemaTS r.MvccTimestamp = mvccTS @@ -370,7 +420,11 @@ func makeEventRow( } func newEvaluator( - execCfg *sql.ExecutorConfig, semaCtx *tree.SemaContext, ed *cdcevent.EventDescriptor, expr string, + execCfg *sql.ExecutorConfig, + semaCtx *tree.SemaContext, + ed *cdcevent.EventDescriptor, + withDiff bool, + expr string, ) (*Evaluator, error) { sc, err := ParseChangefeedExpression(expr) if err != nil { @@ -383,5 +437,5 @@ func newEvaluator( return nil, err } return NewEvaluator(norm.SelectClause, execCfg, username.RootUserName(), - defaultDBSessionData, execCfg.Clock.Now()), nil + defaultDBSessionData, execCfg.Clock.Now(), withDiff), nil } diff --git a/pkg/ccl/changefeedccl/cdceval/validation_test.go b/pkg/ccl/changefeedccl/cdceval/validation_test.go index 085336eee89a..a4e7a4c33911 100644 --- a/pkg/ccl/changefeedccl/cdceval/validation_test.go +++ b/pkg/ccl/changefeedccl/cdceval/validation_test.go @@ -276,7 +276,7 @@ func TestSelectClauseRequiresPrev(t *testing.T) { { name: "nested call to cdc_prev", desc: descs[`foo`], - stmt: "SELECT jsonb_build_object('op',IF(cdc_is_delete(),'u',IF(row_to_json((cdc_prev).*)::string='null','c','u'))) from foo", + stmt: "SELECT jsonb_build_object('op',IF(event_op() = 'delete','u',IF(row_to_json((cdc_prev).*)::string='null','c','u'))) from foo", requiresPrev: true, }, { diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index 593d16301426..8b25add26221 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -7075,7 +7075,7 @@ INSERT INTO foo (a, b, e) VALUES (2, 'two', 'closed'); CREATE CHANGEFEED WITH schema_change_policy='stop' AS SELECT * FROM `+fromClause+` -WHERE e IN ('open', 'closed') AND NOT cdc_is_delete()`) +WHERE e IN ('open', 'closed') AND event_op() != 'delete'`) defer closeFeed(t, feed) assertPayloads(t, feed, []string{ diff --git a/pkg/ccl/changefeedccl/event_processing.go b/pkg/ccl/changefeedccl/event_processing.go index 5046b5e49d6f..f52b47c88b4f 100644 --- a/pkg/ccl/changefeedccl/event_processing.go +++ b/pkg/ccl/changefeedccl/event_processing.go @@ -231,7 +231,7 @@ func newKVEventToRowConsumer( var evaluator *cdceval.Evaluator if spec.Select.Expr != "" { - evaluator, err = newEvaluator(ctx, cfg, spec) + evaluator, err = newEvaluator(ctx, cfg, spec, details.Opts.GetFilters().WithDiff) if err != nil { return nil, err } @@ -260,7 +260,10 @@ func newKVEventToRowConsumer( } func newEvaluator( - ctx context.Context, cfg *sql.ExecutorConfig, spec execinfrapb.ChangeAggregatorSpec, + ctx context.Context, + cfg *sql.ExecutorConfig, + spec execinfrapb.ChangeAggregatorSpec, + withDiff bool, ) (*cdceval.Evaluator, error) { sc, err := cdceval.ParseChangefeedExpression(spec.Select.Expr) if err != nil { @@ -290,7 +293,7 @@ func newEvaluator( sd = *spec.Feed.SessionData } - return cdceval.NewEvaluator(sc, cfg, spec.User(), sd, spec.Feed.StatementTime), nil + return cdceval.NewEvaluator(sc, cfg, spec.User(), sd, spec.Feed.StatementTime, withDiff), nil } func (c *kvEventToRowConsumer) topicForEvent(eventMeta cdcevent.Metadata) (TopicDescriptor, error) { From db011d3ae18a0c05e89f6e83fb0689ebdd95e624 Mon Sep 17 00:00:00 2001 From: Jane Xing Date: Mon, 13 Feb 2023 10:10:40 -0500 Subject: [PATCH 2/4] parser: add `QUERY` to `bare_label_keyword` As revealed by #84309, some keywords not added to `bare_label_keywords` in `sql.y` will make the `SELECT ... ` statement error out, which is not compatible with postgres. This commit is to add the `QUERY` keyword per a support ticket. We're not adding the whole `unreserved_keyword` list here as having some of them in `bare_label_keyword`, such as `DAY`, bring `reduce` errors. Postgres: ``` postgres=# select substring('stringstringstring',1,10) query; query ------------ stringstri (1 row) ``` CRDB: ``` root@localhost:26257/defaultdb> select substring('stringstringstring',1,10) query; invalid syntax: statement ignored: at or near "query": syntax error SQLSTATE: 42601 DETAIL: source SQL: select substring('stringstringstring',1,10) query ^ ``` informs #84309 Release Note(bug fix): fixed the syntax error for `SELECT ... QUERY` (without AS) statement. --- docs/generated/sql/bnf/stmt_block.bnf | 1 + pkg/sql/parser/sql.y | 5 +++-- pkg/sql/parser/testdata/select_clauses | 8 ++++++++ 3 files changed, 12 insertions(+), 2 deletions(-) diff --git a/docs/generated/sql/bnf/stmt_block.bnf b/docs/generated/sql/bnf/stmt_block.bnf index a091739d23f0..8d45e20e3a6c 100644 --- a/docs/generated/sql/bnf/stmt_block.bnf +++ b/docs/generated/sql/bnf/stmt_block.bnf @@ -3426,6 +3426,7 @@ bare_label_keywords ::= | 'INVOKER' | 'LEAKPROOF' | 'PARALLEL' + | 'QUERY' | 'RETURN' | 'RETURNS' | 'SECURITY' diff --git a/pkg/sql/parser/sql.y b/pkg/sql/parser/sql.y index eb3a743c4f76..ba35bdc4a8e4 100644 --- a/pkg/sql/parser/sql.y +++ b/pkg/sql/parser/sql.y @@ -15904,13 +15904,13 @@ unrestricted_name: | reserved_keyword // Keyword category lists. Generally, every keyword present in the Postgres -// grammar should appear in exactly one of these lists. +// grammar should appear in exactly one of these "x_keyword" lists. // // Put a new keyword into the first list that it can go into without causing // shift or reduce conflicts. The earlier lists define "less reserved" // categories of keywords. // -// Note: also add the new keyword to `bare_label` list to not break +// Note: also add the **new** keyword to `bare_label_keywords` list to not break // user queries using column label without `AS`. // "Unreserved" keywords --- available for use as any kind of name. unreserved_keyword: @@ -16374,6 +16374,7 @@ bare_label_keywords: | INVOKER | LEAKPROOF | PARALLEL +| QUERY | RETURN | RETURNS | SECURITY diff --git a/pkg/sql/parser/testdata/select_clauses b/pkg/sql/parser/testdata/select_clauses index 787eea3adf91..33b90402fe53 100644 --- a/pkg/sql/parser/testdata/select_clauses +++ b/pkg/sql/parser/testdata/select_clauses @@ -3051,3 +3051,11 @@ SELECT * FROM ROWS FROM (json_to_record('')) AS t (a INT8, b STRING, c foo) -- n SELECT (*) FROM ROWS FROM ((json_to_record(('')))) AS t (a INT8, b STRING, c foo) -- fully parenthesized SELECT * FROM ROWS FROM (json_to_record('_')) AS t (a INT8, b STRING, c foo) -- literals removed SELECT * FROM ROWS FROM (json_to_record('')) AS _ (_ INT8, _ STRING, _ foo) -- identifiers removed + +parse +SELECT substring('stringstringstring',1,10) QUERY +---- +SELECT substring('stringstringstring', 1, 10) AS query -- normalized! +SELECT (substring(('stringstringstring'), (1), (10))) AS query -- fully parenthesized +SELECT substring('_', _, _) AS query -- literals removed +SELECT substring('stringstringstring', 1, 10) AS _ -- identifiers removed From 204ad978bf11be5d199323d5e68e7d7742c527b7 Mon Sep 17 00:00:00 2001 From: Radu Berinde Date: Tue, 14 Feb 2023 10:40:20 -0800 Subject: [PATCH 3/4] storage: include version in benchmark fixture directory Some storage-related benchmarks leave fixtures around in `.gitignore`d directories. There is potential for fixtures from old versions to cause failures. We are seeing some CI failures that would be explained by this (we're not sure yet if it's possible). In any case, it is better to include the version in the fixture name to avid this (which can be a problem even locally). The logging around the fixture location is also improved. Informs #97061 Release note: None Epic: none --- pkg/ccl/storageccl/engineccl/.gitignore | 4 ++++ pkg/ccl/storageccl/engineccl/BUILD.bazel | 1 + pkg/ccl/storageccl/engineccl/bench_test.go | 14 ++++++++++++-- .../batcheval/cmd_refresh_range_bench_test.go | 17 ++++++++++++++--- pkg/kv/kvserver/rangefeed/BUILD.bazel | 1 + .../rangefeed/catchup_scan_bench_test.go | 13 ++++++++++--- pkg/storage/bench_data_test.go | 16 +++++++++++++--- pkg/storage/open.go | 8 -------- 8 files changed, 55 insertions(+), 19 deletions(-) diff --git a/pkg/ccl/storageccl/engineccl/.gitignore b/pkg/ccl/storageccl/engineccl/.gitignore index 2b05ecacc733..77e451b2ff7a 100644 --- a/pkg/ccl/storageccl/engineccl/.gitignore +++ b/pkg/ccl/storageccl/engineccl/.gitignore @@ -1,3 +1,7 @@ # Do not add environment-specific entries here (see the top-level .gitignore # for reasoning and alternatives). + +# Old benchmark data. mvcc_data +# New benchmark data. +mvcc_data_* diff --git a/pkg/ccl/storageccl/engineccl/BUILD.bazel b/pkg/ccl/storageccl/engineccl/BUILD.bazel index bbaed697b3f8..831b2dbee1f2 100644 --- a/pkg/ccl/storageccl/engineccl/BUILD.bazel +++ b/pkg/ccl/storageccl/engineccl/BUILD.bazel @@ -43,6 +43,7 @@ go_test( "//pkg/base", "//pkg/ccl/baseccl", "//pkg/ccl/storageccl/engineccl/enginepbccl", + "//pkg/clusterversion", "//pkg/keys", "//pkg/roachpb", "//pkg/settings/cluster", diff --git a/pkg/ccl/storageccl/engineccl/bench_test.go b/pkg/ccl/storageccl/engineccl/bench_test.go index 08cde2494f20..3bbd0b656c15 100644 --- a/pkg/ccl/storageccl/engineccl/bench_test.go +++ b/pkg/ccl/storageccl/engineccl/bench_test.go @@ -16,6 +16,7 @@ import ( "path/filepath" "testing" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" @@ -43,10 +44,13 @@ import ( // The creation of the database is time consuming, so the caller can choose // whether to use a temporary or permanent location. func loadTestData( - dir string, numKeys, numBatches, batchTimeSpan, valueBytes int, + dirPrefix string, numKeys, numBatches, batchTimeSpan, valueBytes int, ) (storage.Engine, error) { ctx := context.Background() + verStr := fmt.Sprintf("v%s", clusterversion.TestingBinaryVersion.String()) + dir := fmt.Sprintf("%s_v%s_%d_%d_%d_%d", dirPrefix, verStr, numKeys, numBatches, batchTimeSpan, valueBytes) + exists := true if _, err := os.Stat(dir); oserror.IsNotExist(err) { exists = false @@ -60,12 +64,18 @@ func loadTestData( return nil, err } + absPath, err := filepath.Abs(dir) + if err != nil { + absPath = dir + } + if exists { + log.Infof(context.Background(), "using existing test data: %s", absPath) testutils.ReadAllFiles(filepath.Join(dir, "*")) return eng, nil } - log.Infof(context.Background(), "creating test data: %s", dir) + log.Infof(context.Background(), "creating test data: %s", absPath) // Generate the same data every time. rng := rand.New(rand.NewSource(1449168817)) diff --git a/pkg/kv/kvserver/batcheval/cmd_refresh_range_bench_test.go b/pkg/kv/kvserver/batcheval/cmd_refresh_range_bench_test.go index e77ca18dd056..a3350b564586 100644 --- a/pkg/kv/kvserver/batcheval/cmd_refresh_range_bench_test.go +++ b/pkg/kv/kvserver/batcheval/cmd_refresh_range_bench_test.go @@ -19,6 +19,7 @@ import ( "testing" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" @@ -210,6 +211,9 @@ func setupMVCCPebble(b testing.TB, dir string, lBaseMaxBytes int64, readOnly boo func setupData( ctx context.Context, b *testing.B, emk engineMaker, opts benchDataOptions, ) (storage.Engine, string) { + // Include the current version in the fixture name, or we may inadvertently + // run against a left-over fixture that is no longer supported. + verStr := fmt.Sprintf("v%s", clusterversion.TestingBinaryVersion.String()) orderStr := "linear" if opts.randomKeyOrder { orderStr = "random" @@ -218,8 +222,9 @@ func setupData( if opts.readOnlyEngine { readOnlyStr = "_readonly" } - loc := fmt.Sprintf("refresh_range_bench_data_%s%s_%d_%d_%d", - orderStr, readOnlyStr, opts.numKeys, opts.valueBytes, opts.lBaseMaxBytes) + loc := fmt.Sprintf("refresh_range_bench_data_%s_%s%s_%d_%d_%d", + verStr, orderStr, readOnlyStr, opts.numKeys, opts.valueBytes, opts.lBaseMaxBytes) + exists := true if _, err := os.Stat(loc); oserror.IsNotExist(err) { exists = false @@ -227,13 +232,19 @@ func setupData( b.Fatal(err) } + absPath, err := filepath.Abs(loc) + if err != nil { + absPath = loc + } + if exists { + log.Infof(ctx, "using existing refresh range benchmark data: %s", absPath) testutils.ReadAllFiles(filepath.Join(loc, "*")) return emk(b, loc, opts.lBaseMaxBytes, opts.readOnlyEngine), loc } eng := emk(b, loc, opts.lBaseMaxBytes, false) - log.Infof(ctx, "creating refresh range benchmark data: %s", loc) + log.Infof(ctx, "creating refresh range benchmark data: %s", absPath) // Generate the same data every time. rng := rand.New(rand.NewSource(1449168817)) diff --git a/pkg/kv/kvserver/rangefeed/BUILD.bazel b/pkg/kv/kvserver/rangefeed/BUILD.bazel index 8c6a9a265804..9c566a82403c 100644 --- a/pkg/kv/kvserver/rangefeed/BUILD.bazel +++ b/pkg/kv/kvserver/rangefeed/BUILD.bazel @@ -55,6 +55,7 @@ go_test( embed = [":rangefeed"], deps = [ "//pkg/base", + "//pkg/clusterversion", "//pkg/keys", "//pkg/roachpb", "//pkg/settings/cluster", diff --git a/pkg/kv/kvserver/rangefeed/catchup_scan_bench_test.go b/pkg/kv/kvserver/rangefeed/catchup_scan_bench_test.go index f22ba5c3bc32..f1be451858b9 100644 --- a/pkg/kv/kvserver/rangefeed/catchup_scan_bench_test.go +++ b/pkg/kv/kvserver/rangefeed/catchup_scan_bench_test.go @@ -19,6 +19,7 @@ import ( "testing" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rangefeed" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" @@ -229,6 +230,7 @@ func setupMVCCPebble(b testing.TB, dir string, lBaseMaxBytes int64, readOnly boo func setupData( ctx context.Context, b *testing.B, emk engineMaker, opts benchDataOptions, ) (storage.Engine, string) { + verStr := fmt.Sprintf("v%s", clusterversion.TestingBinaryVersion.String()) orderStr := "linear" if opts.randomKeyOrder { orderStr = "random" @@ -237,8 +239,8 @@ func setupData( if opts.readOnlyEngine { readOnlyStr = "_readonly" } - loc := fmt.Sprintf("rangefeed_bench_data_%s%s_%d_%d_%d_%d", - orderStr, readOnlyStr, opts.numKeys, opts.valueBytes, opts.lBaseMaxBytes, opts.numRangeKeys) + loc := fmt.Sprintf("rangefeed_bench_data_%s_%s%s_%d_%d_%d_%d", + verStr, orderStr, readOnlyStr, opts.numKeys, opts.valueBytes, opts.lBaseMaxBytes, opts.numRangeKeys) exists := true if _, err := os.Stat(loc); oserror.IsNotExist(err) { exists = false @@ -246,13 +248,18 @@ func setupData( b.Fatal(err) } + absPath, err := filepath.Abs(loc) + if err != nil { + absPath = loc + } if exists { + log.Infof(ctx, "using existing refresh range benchmark data: %s", absPath) testutils.ReadAllFiles(filepath.Join(loc, "*")) return emk(b, loc, opts.lBaseMaxBytes, opts.readOnlyEngine), loc } eng := emk(b, loc, opts.lBaseMaxBytes, false) - log.Infof(ctx, "creating rangefeed benchmark data: %s", loc) + log.Infof(ctx, "creating rangefeed benchmark data: %s", absPath) // Generate the same data every time. rng := rand.New(rand.NewSource(1449168817)) diff --git a/pkg/storage/bench_data_test.go b/pkg/storage/bench_data_test.go index 57bb9c586c94..5a4f8a7aa412 100644 --- a/pkg/storage/bench_data_test.go +++ b/pkg/storage/bench_data_test.go @@ -26,6 +26,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/randutil" "github.com/cockroachdb/errors/oserror" + "github.com/cockroachdb/pebble" "github.com/cockroachdb/pebble/vfs" "github.com/stretchr/testify/require" ) @@ -38,7 +39,7 @@ type initialState interface { Base() initialState // Key returns a unique sequence of strings that uniquely identifies the - // represented initial condtions. Key is used as the cache key for reusing + // represented initial conditions. Key is used as the cache key for reusing // databases computed by previous runs, so all configuration must be fully // represented in Key's return value. Key() []string @@ -58,6 +59,14 @@ type engineWithLocation struct { Location } +// TODO(jackson): Tie this to the mapping in SetMinVersion. +var latestReleaseFormatMajorVersion = pebble.FormatPrePebblev1Marked // v22.2 + +var latestReleaseFormatMajorVersionOpt ConfigOption = func(cfg *engineConfig) error { + cfg.PebbleConfig.Opts.FormatMajorVersion = latestReleaseFormatMajorVersion + return nil +} + // getInitialStateEngine constructs an Engine with an initial database // state necessary for a benchmark. The initial states are cached on the // filesystem to avoid expensive reconstruction when possible. The return value @@ -89,7 +98,7 @@ func getInitialStateEngine( opts := append([]ConfigOption{ MustExist, - LatestReleaseFormatMajorVersion, + latestReleaseFormatMajorVersionOpt, }, initial.ConfigOptions()...) if !inMemory { @@ -149,7 +158,7 @@ func buildInitialState( e.Close() buildFS = e.Location.fs } else { - opts := append([]ConfigOption{LatestReleaseFormatMajorVersion}, initial.ConfigOptions()...) + opts := append([]ConfigOption{latestReleaseFormatMajorVersionOpt}, initial.ConfigOptions()...) // Regardless of whether the initial conditions specify an in-memory engine // or not, we build the conditions using an in-memory engine for @@ -240,6 +249,7 @@ var _ initialState = mvccBenchData{} func (d mvccBenchData) Key() []string { key := []string{ "mvcc", + fmt.Sprintf("fmtver_%d", latestReleaseFormatMajorVersion), fmt.Sprintf("numKeys_%d", d.numKeys), fmt.Sprintf("numVersions_%d", d.numVersions), fmt.Sprintf("valueBytes_%d", d.valueBytes), diff --git a/pkg/storage/open.go b/pkg/storage/open.go index 92285225bfa5..871daf61fb1e 100644 --- a/pkg/storage/open.go +++ b/pkg/storage/open.go @@ -167,14 +167,6 @@ func Hook(hookFunc func(*base.StorageConfig) error) ConfigOption { } } -// LatestReleaseFormatMajorVersion opens the database already upgraded to the -// latest release's format major version. -var LatestReleaseFormatMajorVersion ConfigOption = func(cfg *engineConfig) error { - // TODO(jackson): Tie the below to the mapping in SetMinVersion. - cfg.PebbleConfig.Opts.FormatMajorVersion = pebble.FormatPrePebblev1Marked // v22.2 - return nil -} - // If enables the given option if enable is true. func If(enable bool, opt ConfigOption) ConfigOption { if enable { From 93f134a9bd74d237e8905a0660e24c7fa65af849 Mon Sep 17 00:00:00 2001 From: Andy Yang Date: Tue, 14 Feb 2023 17:12:02 -0500 Subject: [PATCH 4/4] sql: add missing STORING clause in system.privileges user ID migration This patch fixes a discrepancy between the bootstrap schema and user ID migration for the system.privileges table. Release note: None --- pkg/upgrade/upgrades/system_privileges_user_id_migration.go | 2 +- .../upgrades/system_privileges_user_id_migration_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/upgrade/upgrades/system_privileges_user_id_migration.go b/pkg/upgrade/upgrades/system_privileges_user_id_migration.go index c5127ee5a92a..0ee5a5d612a7 100644 --- a/pkg/upgrade/upgrades/system_privileges_user_id_migration.go +++ b/pkg/upgrade/upgrades/system_privileges_user_id_migration.go @@ -28,7 +28,7 @@ FAMILY "primary" ` const createUniqueIndexOnUserIDAndPathOnSystemPrivilegesStmt = ` -CREATE UNIQUE INDEX IF NOT EXISTS privileges_path_user_id_key ON system.privileges (path ASC, user_id ASC) +CREATE UNIQUE INDEX IF NOT EXISTS privileges_path_user_id_key ON system.privileges (path, user_id) STORING (privileges, grant_options) ` func alterSystemPrivilegesAddUserIDColumn( diff --git a/pkg/upgrade/upgrades/system_privileges_user_id_migration_test.go b/pkg/upgrade/upgrades/system_privileges_user_id_migration_test.go index ed29cf7e500c..d1c3501b8a73 100644 --- a/pkg/upgrade/upgrades/system_privileges_user_id_migration_test.go +++ b/pkg/upgrade/upgrades/system_privileges_user_id_migration_test.go @@ -119,7 +119,7 @@ func runTestSystemPrivilegesUserIDMigration(t *testing.T, numUsers int) { grant_options STRING[] NOT NULL, user_id OID NULL, CONSTRAINT "primary" PRIMARY KEY (username ASC, path ASC), - UNIQUE INDEX privileges_path_user_id_key (path ASC, user_id ASC) + UNIQUE INDEX privileges_path_user_id_key (path ASC, user_id ASC) STORING (privileges, grant_options) )` r := tdb.QueryRow(t, "SELECT create_statement FROM [SHOW CREATE TABLE system.privileges]") var actualSchema string