diff --git a/pkg/ccl/changefeedccl/cdceval/compat.go b/pkg/ccl/changefeedccl/cdceval/compat.go index ae38c8c7e312..6f5dbc9102be 100644 --- a/pkg/ccl/changefeedccl/cdceval/compat.go +++ b/pkg/ccl/changefeedccl/cdceval/compat.go @@ -9,6 +9,8 @@ package cdceval import ( + "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" + "github.com/cockroachdb/cockroach/pkg/sql/parser" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/errors" ) @@ -23,18 +25,39 @@ func RewritePreviewExpression(oldExpr *tree.SelectClause) (*tree.SelectClause, e default: return true, expr, nil case *tree.FuncExpr: - // Old style cdc_prev() function returned JSONb object. - // Replace this call with a call to row_to_json((cdc_prev).*) - if n, ok := t.Func.FunctionReference.(*tree.UnresolvedName); ok && n.Parts[0] == "cdc_prev" { - rowToJSON := &tree.FuncExpr{ - Func: tree.ResolvableFunctionReference{ - FunctionReference: tree.FunDefs["row_to_json"], - }, - Exprs: tree.Exprs{ - &tree.TupleStar{Expr: tree.NewUnresolvedName("cdc_prev")}, - }, + if n, ok := t.Func.FunctionReference.(*tree.UnresolvedName); ok { + switch n.Parts[0] { + case "cdc_prev": + // Old style cdc_prev() function returned JSONb object. + // Replace this call with a call to row_to_json((cdc_prev).*) + rowToJSON := &tree.FuncExpr{ + Func: tree.ResolvableFunctionReference{ + FunctionReference: tree.FunDefs["row_to_json"], + }, + Exprs: tree.Exprs{ + &tree.TupleStar{Expr: tree.NewUnresolvedName("cdc_prev")}, + }, + } + return true, rowToJSON, nil + case "cdc_is_delete": + // Old style cdc_is_delete returned boolean; use event_op instead. + newExpr, err := parser.ParseExpr("(event_op() = 'delete')") + if err != nil { + return false, expr, err + } + return true, newExpr, nil + case "cdc_mvcc_timestamp": + // Old cdc_mvcc_timestamp() function gets replaced with crdb_intenral_mvcc_timestamp column. + return true, tree.NewUnresolvedName(colinfo.MVCCTimestampColumnName), nil + case "cdc_updated_timestamp": + // Old cdc_updated_timestamp gets replaced with event_schema_timestamp. + newExpr := &tree.FuncExpr{ + Func: tree.ResolvableFunctionReference{ + FunctionReference: tree.NewUnresolvedName("changefeed_event_schema_timestamp"), + }, + } + return true, newExpr, nil } - return true, rowToJSON, nil } } return true, expr, nil diff --git a/pkg/ccl/changefeedccl/cdceval/compat_test.go b/pkg/ccl/changefeedccl/cdceval/compat_test.go index 8d6c5c7c4a24..1b0371bb29d2 100644 --- a/pkg/ccl/changefeedccl/cdceval/compat_test.go +++ b/pkg/ccl/changefeedccl/cdceval/compat_test.go @@ -64,6 +64,18 @@ func TestCompatRewrite(t *testing.T) { oldExpr: "SELECT foo.*, cdc_prev() FROM " + fooRef + " WHERE cdc_prev()->>'field' = 'blah'", newExpr: "SELECT foo.*, row_to_json((cdc_prev).*) FROM " + fooRef + " WHERE (row_to_json((cdc_prev).*)->>'field') = 'blah'", }, + { + oldExpr: "SELECT foo.*, cdc_is_delete() FROM " + fooRef, + newExpr: "SELECT foo.*, (event_op() = 'delete') FROM " + fooRef, + }, + { + oldExpr: "SELECT foo.*, cdc_is_delete() AS was_deleted FROM " + fooRef, + newExpr: "SELECT foo.*, (event_op() = 'delete') AS was_deleted FROM " + fooRef, + }, + { + oldExpr: "SELECT foo.*, cdc_mvcc_timestamp() FROM " + fooRef, + newExpr: "SELECT foo.*, crdb_internal_mvcc_timestamp FROM " + fooRef, + }, } { sc, err := ParseChangefeedExpression(tc.oldExpr) require.NoError(t, err) diff --git a/pkg/ccl/changefeedccl/cdceval/expr_eval.go b/pkg/ccl/changefeedccl/cdceval/expr_eval.go index 2d72a8e77cc6..9027e2f0ba0a 100644 --- a/pkg/ccl/changefeedccl/cdceval/expr_eval.go +++ b/pkg/ccl/changefeedccl/cdceval/expr_eval.go @@ -41,8 +41,8 @@ type Evaluator struct { execCfg *sql.ExecutorConfig user username.SQLUsername sessionData sessiondatapb.SessionData - - familyEval map[descpb.FamilyID]*familyEvaluator + withDiff bool + familyEval map[descpb.FamilyID]*familyEvaluator } // familyEvaluator is a responsible for evaluating expressions in CDC @@ -82,6 +82,7 @@ func NewEvaluator( user username.SQLUsername, sd sessiondatapb.SessionData, statementTS hlc.Timestamp, + withDiff bool, ) *Evaluator { return &Evaluator{ sc: sc, @@ -89,6 +90,7 @@ func NewEvaluator( user: user, sessionData: sd, statementTS: statementTS, + withDiff: withDiff, familyEval: make(map[descpb.FamilyID]*familyEvaluator, 1), // usually, just 1 family. } } @@ -101,6 +103,7 @@ func newFamilyEvaluator( user username.SQLUsername, sd sessiondatapb.SessionData, statementTS hlc.Timestamp, + withDiff bool, ) *familyEvaluator { e := familyEvaluator{ targetFamilyID: targetFamilyID, @@ -113,6 +116,8 @@ func newFamilyEvaluator( rowCh: make(chan tree.Datums, 1), } e.rowEvalCtx.startTime = statementTS + e.rowEvalCtx.withDiff = withDiff + // Arrange to be notified when event does not match predicate. predicateAsProjection(e.norm) @@ -143,7 +148,9 @@ func (e *Evaluator) Eval( fe, ok := e.familyEval[updatedRow.FamilyID] if !ok { - fe = newFamilyEvaluator(e.sc, updatedRow.FamilyID, e.execCfg, e.user, e.sessionData, e.statementTS) + fe = newFamilyEvaluator( + e.sc, updatedRow.FamilyID, e.execCfg, e.user, e.sessionData, e.statementTS, e.withDiff, + ) e.familyEval[updatedRow.FamilyID] = fe } @@ -183,7 +190,7 @@ func (e *familyEvaluator) eval( } // Setup context. - if err := e.setupContextForRow(ctx, updatedRow); err != nil { + if err := e.setupContextForRow(ctx, updatedRow, prevRow); err != nil { return cdcevent.Row{}, err } @@ -447,9 +454,28 @@ func (e *familyEvaluator) copyPrevRow(prev cdcevent.Row) error { // setupContextForRow configures evaluation context with the provided row // information. -func (e *familyEvaluator) setupContextForRow(ctx context.Context, updated cdcevent.Row) error { +func (e *familyEvaluator) setupContextForRow( + ctx context.Context, updated cdcevent.Row, prevRow cdcevent.Row, +) error { e.rowEvalCtx.ctx = ctx e.rowEvalCtx.updatedRow = updated + + if updated.IsDeleted() { + e.rowEvalCtx.op = eventTypeDelete + } else { + // Insert or update. + if e.rowEvalCtx.withDiff { + if prevRow.IsInitialized() { + e.rowEvalCtx.op = eventTypeUpdate + } else { + e.rowEvalCtx.op = eventTypeInsert + } + } else { + // Without diff option we can't tell insert from update; so, use upsert. + e.rowEvalCtx.op = eventTypeUpsert + } + } + return nil } @@ -478,7 +504,9 @@ func (e *familyEvaluator) closeErr() error { type rowEvalContext struct { ctx context.Context startTime hlc.Timestamp + withDiff bool updatedRow cdcevent.Row + op tree.Datum } // cdcAnnotationAddr is the address used to store relevant information diff --git a/pkg/ccl/changefeedccl/cdceval/expr_eval_test.go b/pkg/ccl/changefeedccl/cdceval/expr_eval_test.go index c5b721d159d4..3d4d23d72597 100644 --- a/pkg/ccl/changefeedccl/cdceval/expr_eval_test.go +++ b/pkg/ccl/changefeedccl/cdceval/expr_eval_test.go @@ -11,6 +11,7 @@ package cdceval import ( "context" "fmt" + "math/rand" "sort" "strconv" "testing" @@ -34,7 +35,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/randutil" "github.com/stretchr/testify/require" ) @@ -145,15 +145,15 @@ $$`) "INSERT INTO foo (a, b) VALUES (2, '2nd test')", "DELETE FROM foo WHERE a=2 AND b='2nd test'", }, - stmt: "SELECT *, cdc_is_delete() FROM foo WHERE 'hello' != 'world'", + stmt: "SELECT *, event_op() = 'delete' AS deleted FROM foo WHERE 'hello' != 'world'", expectMainFamily: []decodeExpectation{ { keyValues: []string{"2nd test", "2"}, - allValues: map[string]string{"a": "2", "b": "2nd test", "e": "inactive", "cdc_is_delete": "false"}, + allValues: map[string]string{"a": "2", "b": "2nd test", "e": "inactive", "deleted": "false"}, }, { keyValues: []string{"2nd test", "2"}, - allValues: map[string]string{"a": "2", "b": "2nd test", "e": "NULL", "cdc_is_delete": "true"}, + allValues: map[string]string{"a": "2", "b": "2nd test", "e": "NULL", "deleted": "true"}, }, }, }, @@ -356,7 +356,7 @@ $$`) "INSERT INTO foo (a, b) VALUES (123, 'select_if')", "DELETE FROM foo where a=123", }, - stmt: "SELECT IF(cdc_is_delete(),'deleted',a::string) AS conditional FROM foo", + stmt: "SELECT IF(event_op() = 'delete','deleted',a::string) AS conditional FROM foo", expectMainFamily: []decodeExpectation{ { keyValues: []string{"select_if", "123"}, @@ -388,7 +388,7 @@ $$`) actions: []string{ "INSERT INTO foo (a, b, h) VALUES (1, 'hello', 'invisible')", }, - stmt: "SELECT a, tableoid, h FROM foo WHERE crdb_internal_mvcc_timestamp = cdc_mvcc_timestamp()", + stmt: "SELECT a, tableoid, h FROM foo WHERE crdb_internal_mvcc_timestamp > 0", expectMainFamily: []decodeExpectation{ { keyValues: []string{"hello", "1"}, @@ -404,7 +404,7 @@ $$`) actions: []string{ "INSERT INTO foo (a, b, h) VALUES (1, 'hello', 'invisible')", }, - stmt: "SELECT a, tableoid, h FROM foo WHERE crdb_internal_mvcc_timestamp = cdc_mvcc_timestamp()", + stmt: "SELECT a, tableoid, h FROM foo WHERE crdb_internal_mvcc_timestamp > 0", expectErr: `column "h" does not exist`, }, { @@ -496,7 +496,7 @@ $$`) "SELECT x, 'only_some_deleted_values', x::string FROM s", }, actions: []string{"DELETE FROM foo WHERE b='only_some_deleted_values'"}, - stmt: `SELECT * FROM foo WHERE cdc_is_delete() AND (cdc_prev).a % 33 = 0`, + stmt: `SELECT * FROM foo WHERE event_op() = 'delete' AND (cdc_prev).a % 33 = 0`, expectMainFamily: repeatExpectation(decodeExpectation{expectUnwatchedErr: true}, 100), expectOnlyCFamily: func() (expectations []decodeExpectation) { for i := 1; i <= 100; i++ { @@ -722,10 +722,9 @@ func slurpValues(t *testing.T, r cdcevent.Row) map[string]string { } func randEncDatumPrimaryFamily( - t *testing.T, desc catalog.TableDescriptor, + t *testing.T, rng *rand.Rand, desc catalog.TableDescriptor, ) (row rowenc.EncDatumRow) { t.Helper() - rng, _ := randutil.NewTestRand() family, err := catalog.MustFindFamilyByID(desc, 0 /* id */) require.NoError(t, err) @@ -781,8 +780,9 @@ func newEvaluatorWithNormCheck( return nil, err } + const withDiff = true return NewEvaluator(norm.SelectClause, execCfg, username.RootUserName(), - defaultDBSessionData, hlc.Timestamp{}), nil + defaultDBSessionData, hlc.Timestamp{}, withDiff), nil } var defaultDBSessionData = sessiondatapb.SessionData{ diff --git a/pkg/ccl/changefeedccl/cdceval/func_resolver_test.go b/pkg/ccl/changefeedccl/cdceval/func_resolver_test.go index c5cb17ebde26..060b04ee8004 100644 --- a/pkg/ccl/changefeedccl/cdceval/func_resolver_test.go +++ b/pkg/ccl/changefeedccl/cdceval/func_resolver_test.go @@ -60,12 +60,12 @@ $$`) }, { testName: "cdc name without schema", - fnName: tree.MakeUnresolvedName("cdc_mvcc_timestamp"), + fnName: tree.MakeUnresolvedName("changefeed_creation_timestamp"), expectedSchema: "public", }, { testName: "uppercase cdc name without schema", - fnName: tree.MakeUnresolvedName("cdc_mVCC_timeStamp"), + fnName: tree.MakeUnresolvedName("changefeed_creATIon_TimeStamp"), expectedSchema: "public", }, { diff --git a/pkg/ccl/changefeedccl/cdceval/functions.go b/pkg/ccl/changefeedccl/cdceval/functions.go index 58c4c5fcd88e..ebd5cccbaadc 100644 --- a/pkg/ccl/changefeedccl/cdceval/functions.go +++ b/pkg/ccl/changefeedccl/cdceval/functions.go @@ -90,38 +90,26 @@ var cdcFunctions = map[string]*tree.ResolvedFunctionDefinition{ //"st_asgeojson", //"st_estimatedextent", - // NB: even though some cdc functions appear to be stable (e.g. cdc_is_delete()), + // NB: even though some cdc functions appear to be stable (e.g. event_op()), // we should not mark custom CDC functions as stable. Doing so will cause // optimizer to (constant) fold this function during optimization step -- something // we definitely don't want to do because we need to evaluate those functions // for each event. - "cdc_is_delete": makeCDCBuiltIn( - "cdc_is_delete", + "event_op": makeCDCBuiltIn( + "event_op", tree.Overload{ Types: tree.ParamTypes{}, - ReturnType: tree.FixedReturnType(types.Bool), + ReturnType: tree.FixedReturnType(types.String), Fn: func(ctx context.Context, evalCtx *eval.Context, datums tree.Datums) (tree.Datum, error) { rowEvalCtx := rowEvalContextFromEvalContext(evalCtx) - if rowEvalCtx.updatedRow.IsDeleted() { - return tree.DBoolTrue, nil - } - return tree.DBoolFalse, nil + return rowEvalCtx.op, nil }, - Info: "Returns true if the event is a deletion", + Info: "Returns 'insert', 'update', 'upsert' or 'delete' to describe the type of the operation.", Volatility: volatility.Volatile, }), - "cdc_mvcc_timestamp": cdcTimestampBuiltin( - "cdc_mvcc_timestamp", - "Returns MVCC timestamp of the event", - volatility.Volatile, - types.Decimal, - func(rowEvalCtx *rowEvalContext) hlc.Timestamp { - return rowEvalCtx.updatedRow.MvccTimestamp - }, - ), - "cdc_updated_timestamp": cdcTimestampBuiltin( - "cdc_updated_timestamp", - "Returns schema timestamp of the event", + "event_schema_timestamp": cdcTimestampBuiltin( + "event_schema_timestamp", + "Returns schema timestamp of the event.", volatility.Volatile, types.Decimal, func(rowEvalCtx *rowEvalContext) hlc.Timestamp { @@ -130,7 +118,7 @@ var cdcFunctions = map[string]*tree.ResolvedFunctionDefinition{ ), "changefeed_creation_timestamp": cdcTimestampBuiltin( "changefeed_creation_timestamp", - "Returns changefeed creation time", + "Returns changefeed creation time.", volatility.Stable, types.Decimal, func(rowEvalCtx *rowEvalContext) hlc.Timestamp { @@ -139,6 +127,13 @@ var cdcFunctions = map[string]*tree.ResolvedFunctionDefinition{ ), } +var ( + eventTypeInsert = tree.NewDString("insert") + eventTypeUpdate = tree.NewDString("update") + eventTypeUpsert = tree.NewDString("upsert") + eventTypeDelete = tree.NewDString("delete") +) + const cdcFnCategory = "CDC builtin" var cdcFnProps = &tree.FunctionProperties{ @@ -253,7 +248,7 @@ func init() { case volatility.Stable: // If the stable function is not on the white list, // then it is blacklisted. - if _, whitelisted := cdcFunctions[fnDef.Name]; !whitelisted { + if _, allowed := cdcFunctions[fnDef.Name]; !allowed { functionDenyList[fnDef.Name] = struct{}{} } } diff --git a/pkg/ccl/changefeedccl/cdceval/functions_test.go b/pkg/ccl/changefeedccl/cdceval/functions_test.go index 0abb6b23a70f..840c091a5199 100644 --- a/pkg/ccl/changefeedccl/cdceval/functions_test.go +++ b/pkg/ccl/changefeedccl/cdceval/functions_test.go @@ -20,6 +20,9 @@ import ( "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" + "github.com/cockroachdb/cockroach/pkg/sql/randgen" + "github.com/cockroachdb/cockroach/pkg/sql/rowenc" "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb" @@ -74,8 +77,7 @@ func TestEvaluatesCDCFunctionOverloads(t *testing.T) { for fn, preferredOverload := range map[string]preferredFn{ "statement_timestamp": expectTSTZ, "transaction_timestamp": expectTSTZ, - "cdc_mvcc_timestamp": expectHLC, - "cdc_updated_timestamp": expectHLC, + "event_schema_timestamp": expectHLC, "changefeed_creation_timestamp": expectHLC, } { t.Run(fn, func(t *testing.T) { @@ -85,22 +87,21 @@ func TestEvaluatesCDCFunctionOverloads(t *testing.T) { targetTS := rowTS switch fn { - case "cdc_updated_timestamp": + case "event_schema_timestamp": targetTS = schemaTS case "changefeed_creation_timestamp": targetTS = createTS } // We'll run tests against some future time stamp to ensure // that time functions use correct values. - testRow := makeEventRow(t, desc, schemaTS, false, rowTS) - e, err := newEvaluator(&execCfg, &semaCtx, testRow.EventDescriptor, - fmt.Sprintf("SELECT "+ - "%[1]s() AS preferred,"+ // Preferred overload. - "%[1]s():::TIMESTAMPTZ AS tstz,"+ // Force timestamptz overload. - "%[1]s():::TIMESTAMP AS ts,"+ // Force timestamp overload. - "%[1]s():::DECIMAL AS dec,"+ // Force decimal overload. - "%[1]s()::STRING AS str"+ // Casts preferred overload to string. - " FROM foo", fn)) + testRow := makeEventRow(t, desc, schemaTS, false, rowTS, false) + e, err := newEvaluator(&execCfg, &semaCtx, testRow.EventDescriptor, false, fmt.Sprintf("SELECT "+ + "%[1]s() AS preferred,"+ // Preferred overload. + "%[1]s():::TIMESTAMPTZ AS tstz,"+ // Force timestamptz overload. + "%[1]s():::TIMESTAMP AS ts,"+ // Force timestamp overload. + "%[1]s():::DECIMAL AS dec,"+ // Force decimal overload. + "%[1]s()::STRING AS str"+ // Casts preferred overload to string. + " FROM foo", fn)) require.NoError(t, err) defer e.Close() e.statementTS = createTS @@ -132,7 +133,7 @@ func TestEvaluatesCDCFunctionOverloads(t *testing.T) { // should have no bearing on the returned values -- we should see // the same thing we saw before. updatedExpectations = initialExpectations - case "cdc_updated_timestamp": + case "event_schema_timestamp": targetTS = testRow.SchemaTS fallthrough default: @@ -158,10 +159,9 @@ func TestEvaluatesCDCFunctionOverloads(t *testing.T) { // timezone. Since we don't do any special setup with session data, the // default timezone is UTC. We'll use a "strange" timezone of -1h33m from // UTC to test conversion. - testRow := makeEventRow(t, desc, s.Clock().Now(), false, futureTS) - e, err := newEvaluator(&execCfg, &semaCtx, testRow.EventDescriptor, - fmt.Sprintf("SELECT timezone('+01:33:00', '%s'::time) FROM foo", - futureTS.GoTime().Format("15:04:05"))) + testRow := makeEventRow(t, desc, s.Clock().Now(), false, futureTS, false) + e, err := newEvaluator(&execCfg, &semaCtx, testRow.EventDescriptor, false, fmt.Sprintf("SELECT timezone('+01:33:00', '%s'::time) FROM foo", + futureTS.GoTime().Format("15:04:05"))) require.NoError(t, err) defer e.Close() @@ -174,21 +174,73 @@ func TestEvaluatesCDCFunctionOverloads(t *testing.T) { }) }) - t.Run("cdc_is_delete", func(t *testing.T) { + t.Run("event_op", func(t *testing.T) { schemaTS := s.Clock().Now() - testRow := makeEventRow(t, desc, schemaTS, false, s.Clock().Now()) - e, err := newEvaluator(&execCfg, &semaCtx, testRow.EventDescriptor, - "SELECT cdc_is_delete() FROM foo") - require.NoError(t, err) - defer e.Close() + row := makeEventRow(t, desc, schemaTS, false, s.Clock().Now(), true) + deletedRow := makeEventRow(t, desc, schemaTS, true, s.Clock().Now(), true) + nilRow := cdcevent.Row{} + + for _, tc := range []struct { + op string + row cdcevent.Row + prevRow cdcevent.Row + withDiff bool + expect string + }{ + { + op: "insert", + row: row, + prevRow: nilRow, + withDiff: true, + expect: "insert", + }, + { + op: "update", + row: row, + prevRow: row, + withDiff: true, + expect: "update", + }, + { + // Without diff, we can't tell an update from insert, so we emit upsert. + op: "insert", + row: row, + prevRow: nilRow, + withDiff: false, + expect: "upsert", + }, + { + // Without diff, we can't tell an update from insert, so we emit upsert. + op: "update", + row: row, + prevRow: nilRow, + withDiff: false, + expect: "upsert", + }, + { + op: "delete", + row: deletedRow, + prevRow: row, + withDiff: true, + expect: "delete", + }, + { + op: "delete", + row: deletedRow, + prevRow: row, + withDiff: false, + expect: "delete", + }, + } { + t.Run(fmt.Sprintf("%s/diff=%t", tc.op, tc.withDiff), func(t *testing.T) { + e, err := newEvaluator(&execCfg, &semaCtx, tc.row.EventDescriptor, tc.withDiff, "SELECT event_op() FROM foo") + require.NoError(t, err) + defer e.Close() - for _, expectDelete := range []bool{true, false} { - testRow := makeEventRow(t, desc, schemaTS, expectDelete, s.Clock().Now()) - p, err := e.Eval(ctx, testRow, cdcevent.Row{}) - require.NoError(t, err) - require.Equal(t, - map[string]string{"cdc_is_delete": fmt.Sprintf("%t", expectDelete)}, - slurpValues(t, p)) + p, err := e.Eval(ctx, tc.row, tc.prevRow) + require.NoError(t, err) + require.Equal(t, map[string]string{"event_op": tc.expect}, slurpValues(t, p)) + }) } }) @@ -200,10 +252,9 @@ func TestEvaluatesCDCFunctionOverloads(t *testing.T) { } t.Run("pg_collation_for", func(t *testing.T) { - testRow := makeEventRow(t, desc, s.Clock().Now(), false, s.Clock().Now()) + testRow := makeEventRow(t, desc, s.Clock().Now(), false, s.Clock().Now(), false) - e, err := newEvaluator(&execCfg, &semaCtx, testRow.EventDescriptor, - `SELECT pg_collation_for('hello' COLLATE de_DE) AS col FROM foo`) + e, err := newEvaluator(&execCfg, &semaCtx, testRow.EventDescriptor, false, `SELECT pg_collation_for('hello' COLLATE de_DE) AS col FROM foo`) require.NoError(t, err) defer e.Close() @@ -214,11 +265,10 @@ func TestEvaluatesCDCFunctionOverloads(t *testing.T) { for _, fn := range []string{"to_json", "to_jsonb"} { t.Run(fn, func(t *testing.T) { - testRow := makeEventRow(t, desc, s.Clock().Now(), false, s.Clock().Now()) + testRow := makeEventRow(t, desc, s.Clock().Now(), false, s.Clock().Now(), false) rowDatums := testRow.EncDatums() - e, err := newEvaluator(&execCfg, &semaCtx, testRow.EventDescriptor, - fmt.Sprintf("SELECT %s(a) FROM foo", fn)) + e, err := newEvaluator(&execCfg, &semaCtx, testRow.EventDescriptor, false, fmt.Sprintf("SELECT %s(a) FROM foo", fn)) require.NoError(t, err) defer e.Close() @@ -231,11 +281,10 @@ func TestEvaluatesCDCFunctionOverloads(t *testing.T) { } t.Run("row_to_json", func(t *testing.T) { - testRow := makeEventRow(t, desc, s.Clock().Now(), false, s.Clock().Now()) + testRow := makeEventRow(t, desc, s.Clock().Now(), false, s.Clock().Now(), false) rowDatums := testRow.EncDatums() - e, err := newEvaluator(&execCfg, &semaCtx, testRow.EventDescriptor, - "SELECT row_to_json(row(a, b, c)) FROM foo") + e, err := newEvaluator(&execCfg, &semaCtx, testRow.EventDescriptor, false, "SELECT row_to_json(row(a, b, c)) FROM foo") require.NoError(t, err) defer e.Close() @@ -251,10 +300,9 @@ func TestEvaluatesCDCFunctionOverloads(t *testing.T) { }) t.Run("jsonb_build_array", func(t *testing.T) { - testRow := makeEventRow(t, desc, s.Clock().Now(), false, s.Clock().Now()) + testRow := makeEventRow(t, desc, s.Clock().Now(), false, s.Clock().Now(), false) rowDatums := testRow.EncDatums() - e, err := newEvaluator(&execCfg, &semaCtx, testRow.EventDescriptor, - "SELECT jsonb_build_array(a, a, 42) AS three_ints FROM foo") + e, err := newEvaluator(&execCfg, &semaCtx, testRow.EventDescriptor, false, "SELECT jsonb_build_array(a, a, 42) AS three_ints FROM foo") require.NoError(t, err) defer e.Close() @@ -271,10 +319,9 @@ func TestEvaluatesCDCFunctionOverloads(t *testing.T) { }) t.Run("jsonb_build_object", func(t *testing.T) { - testRow := makeEventRow(t, desc, s.Clock().Now(), false, s.Clock().Now()) + testRow := makeEventRow(t, desc, s.Clock().Now(), false, s.Clock().Now(), false) rowDatums := testRow.EncDatums() - e, err := newEvaluator(&execCfg, &semaCtx, testRow.EventDescriptor, - "SELECT jsonb_build_object('a', a, 'b', b, 'c', c) AS obj FROM foo") + e, err := newEvaluator(&execCfg, &semaCtx, testRow.EventDescriptor, false, "SELECT jsonb_build_object('a', a, 'b', b, 'c', c) AS obj FROM foo") require.NoError(t, err) defer e.Close() @@ -293,9 +340,8 @@ func TestEvaluatesCDCFunctionOverloads(t *testing.T) { // These functions have overloads; call the one that's stable overload // (i.e. one that needs to convert types.Any to string). t.Run(fn, func(t *testing.T) { - testRow := makeEventRow(t, desc, s.Clock().Now(), false, s.Clock().Now()) - e, err := newEvaluator(&execCfg, &semaCtx, testRow.EventDescriptor, - fmt.Sprintf("SELECT %s(42) FROM foo", fn)) + testRow := makeEventRow(t, desc, s.Clock().Now(), false, s.Clock().Now(), false) + e, err := newEvaluator(&execCfg, &semaCtx, testRow.EventDescriptor, false, fmt.Sprintf("SELECT %s(42) FROM foo", fn)) require.NoError(t, err) defer e.Close() @@ -309,10 +355,9 @@ func TestEvaluatesCDCFunctionOverloads(t *testing.T) { // overlaps has many overloads; most of them are immutable, but 1 is stable. t.Run("overlaps", func(t *testing.T) { - testRow := makeEventRow(t, desc, s.Clock().Now(), false, s.Clock().Now()) + testRow := makeEventRow(t, desc, s.Clock().Now(), false, s.Clock().Now(), false) - e, err := newEvaluator(&execCfg, &semaCtx, testRow.EventDescriptor, - `SELECT overlaps(transaction_timestamp(), interval '0', transaction_timestamp(), interval '-1s') FROM foo`) + e, err := newEvaluator(&execCfg, &semaCtx, testRow.EventDescriptor, false, `SELECT overlaps(transaction_timestamp(), interval '0', transaction_timestamp(), interval '-1s') FROM foo`) require.NoError(t, err) defer e.Close() @@ -324,7 +369,7 @@ func TestEvaluatesCDCFunctionOverloads(t *testing.T) { // Test that cdc specific functions correctly resolve overload, and that an // error is returned when cdc function called with wrong arguments. t.Run("cdc function errors", func(t *testing.T) { - testRow := makeEventRow(t, desc, s.Clock().Now(), false, s.Clock().Now()) + testRow := makeEventRow(t, desc, s.Clock().Now(), false, s.Clock().Now(), false) // currently, all cdc functions take no args, so call these functions with // some arguments. rng, _ := randutil.NewTestRand() @@ -343,8 +388,7 @@ func TestEvaluatesCDCFunctionOverloads(t *testing.T) { // Run this test only for CDC specific functions. if def != useDefaultBuiltin && def.Overloads[0].FunctionProperties.Category == cdcFnCategory { t.Run(fn, func(t *testing.T) { - e, err := newEvaluator(&execCfg, &semaCtx, testRow.EventDescriptor, - fmt.Sprintf("SELECT %s(%s) FROM foo", fn, fnArgs())) + e, err := newEvaluator(&execCfg, &semaCtx, testRow.EventDescriptor, false, fmt.Sprintf("SELECT %s(%s) FROM foo", fn, fnArgs())) require.NoError(t, err) _, err = e.Eval(ctx, testRow, testRow) require.Regexp(t, "unknown signature", err) @@ -360,9 +404,15 @@ func makeEventRow( schemaTS hlc.Timestamp, deleted bool, mvccTS hlc.Timestamp, + includeMVCCCol bool, ) cdcevent.Row { t.Helper() - datums := randEncDatumPrimaryFamily(t, desc) + rng, _ := randutil.NewTestRand() + + datums := randEncDatumPrimaryFamily(t, rng, desc) + if includeMVCCCol { + datums = append(datums, rowenc.EncDatum{Datum: randgen.RandDatum(rng, colinfo.MVCCTimestampColumnType, false)}) + } r := cdcevent.TestingMakeEventRow(desc, 0, datums, deleted) r.SchemaTS = schemaTS r.MvccTimestamp = mvccTS @@ -370,7 +420,11 @@ func makeEventRow( } func newEvaluator( - execCfg *sql.ExecutorConfig, semaCtx *tree.SemaContext, ed *cdcevent.EventDescriptor, expr string, + execCfg *sql.ExecutorConfig, + semaCtx *tree.SemaContext, + ed *cdcevent.EventDescriptor, + withDiff bool, + expr string, ) (*Evaluator, error) { sc, err := ParseChangefeedExpression(expr) if err != nil { @@ -383,5 +437,5 @@ func newEvaluator( return nil, err } return NewEvaluator(norm.SelectClause, execCfg, username.RootUserName(), - defaultDBSessionData, execCfg.Clock.Now()), nil + defaultDBSessionData, execCfg.Clock.Now(), withDiff), nil } diff --git a/pkg/ccl/changefeedccl/cdceval/validation_test.go b/pkg/ccl/changefeedccl/cdceval/validation_test.go index 085336eee89a..a4e7a4c33911 100644 --- a/pkg/ccl/changefeedccl/cdceval/validation_test.go +++ b/pkg/ccl/changefeedccl/cdceval/validation_test.go @@ -276,7 +276,7 @@ func TestSelectClauseRequiresPrev(t *testing.T) { { name: "nested call to cdc_prev", desc: descs[`foo`], - stmt: "SELECT jsonb_build_object('op',IF(cdc_is_delete(),'u',IF(row_to_json((cdc_prev).*)::string='null','c','u'))) from foo", + stmt: "SELECT jsonb_build_object('op',IF(event_op() = 'delete','u',IF(row_to_json((cdc_prev).*)::string='null','c','u'))) from foo", requiresPrev: true, }, { diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index 593d16301426..8b25add26221 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -7075,7 +7075,7 @@ INSERT INTO foo (a, b, e) VALUES (2, 'two', 'closed'); CREATE CHANGEFEED WITH schema_change_policy='stop' AS SELECT * FROM `+fromClause+` -WHERE e IN ('open', 'closed') AND NOT cdc_is_delete()`) +WHERE e IN ('open', 'closed') AND event_op() != 'delete'`) defer closeFeed(t, feed) assertPayloads(t, feed, []string{ diff --git a/pkg/ccl/changefeedccl/event_processing.go b/pkg/ccl/changefeedccl/event_processing.go index 5046b5e49d6f..f52b47c88b4f 100644 --- a/pkg/ccl/changefeedccl/event_processing.go +++ b/pkg/ccl/changefeedccl/event_processing.go @@ -231,7 +231,7 @@ func newKVEventToRowConsumer( var evaluator *cdceval.Evaluator if spec.Select.Expr != "" { - evaluator, err = newEvaluator(ctx, cfg, spec) + evaluator, err = newEvaluator(ctx, cfg, spec, details.Opts.GetFilters().WithDiff) if err != nil { return nil, err } @@ -260,7 +260,10 @@ func newKVEventToRowConsumer( } func newEvaluator( - ctx context.Context, cfg *sql.ExecutorConfig, spec execinfrapb.ChangeAggregatorSpec, + ctx context.Context, + cfg *sql.ExecutorConfig, + spec execinfrapb.ChangeAggregatorSpec, + withDiff bool, ) (*cdceval.Evaluator, error) { sc, err := cdceval.ParseChangefeedExpression(spec.Select.Expr) if err != nil { @@ -290,7 +293,7 @@ func newEvaluator( sd = *spec.Feed.SessionData } - return cdceval.NewEvaluator(sc, cfg, spec.User(), sd, spec.Feed.StatementTime), nil + return cdceval.NewEvaluator(sc, cfg, spec.User(), sd, spec.Feed.StatementTime, withDiff), nil } func (c *kvEventToRowConsumer) topicForEvent(eventMeta cdcevent.Metadata) (TopicDescriptor, error) {