Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changefeedccl: Support more stable functions. #83548

Merged
merged 1 commit into from
Jun 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/cdceval/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ go_library(
"//pkg/sql/parser",
"//pkg/sql/pgwire/pgcode",
"//pkg/sql/pgwire/pgerror",
"//pkg/sql/sem/builtins",
"//pkg/sql/sem/eval",
"//pkg/sql/sem/normalize",
"//pkg/sql/sem/tree",
Expand Down Expand Up @@ -82,6 +81,7 @@ go_test(
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/randutil",
"//pkg/util/timeofday",
"@com_github_stretchr_testify//require",
],
)
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/cdceval/constraint.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func constrainSpansBySelectClause(
}

tableName := tableNameOrAlias(ed.TableName, selectClause.From.Tables[0])
semaCtx := newSemaCtx(ed)
semaCtx := newSemaCtxWithTypeResolver(ed)
return sc.ConstrainPrimaryIndexSpanByExpr(
ctx, sql.BestEffortConstrain, tableName, ed.TableDescriptor(),
evalCtx, semaCtx, selectClause.Where.Expr)
Expand Down
69 changes: 31 additions & 38 deletions pkg/ccl/changefeedccl/cdceval/expr_eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/schemaexpr"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/sem/builtins"
"github.com/cockroachdb/cockroach/pkg/sql/sem/eval"
"github.com/cockroachdb/cockroach/pkg/sql/sem/normalize"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
Expand Down Expand Up @@ -106,17 +105,18 @@ func (e *Evaluator) initSelectClause(sc *tree.SelectClause) error {
"expected at least 1 projection")
}

semaCtx := newSemaCtx()
e.selectors = sc.Exprs
for _, se := range e.selectors {
expr, err := validateExpressionForCDC(se.Expr)
expr, err := validateExpressionForCDC(se.Expr, semaCtx)
if err != nil {
return err
}
se.Expr = expr
}

if sc.Where != nil {
expr, err := validateExpressionForCDC(sc.Where.Expr)
expr, err := validateExpressionForCDC(sc.Where.Expr, semaCtx)
if err != nil {
return err
}
Expand Down Expand Up @@ -207,7 +207,7 @@ func newExprEval(
cols := ed.ResultColumns()
e := &exprEval{
EventDescriptor: ed,
semaCtx: newSemaCtx(ed),
semaCtx: newSemaCtxWithTypeResolver(ed),
evalCtx: evalCtx.Copy(),
evalHelper: &rowContainer{cols: cols},
projection: cdcevent.MakeProjection(ed),
Expand Down Expand Up @@ -268,6 +268,7 @@ func (e *exprEval) setupContext(
e.rowEvalCtx.prevRow = prevRow
e.rowEvalCtx.mvccTS = mvccTS
e.evalCtx.TxnTimestamp = mvccTS.GoTime()
e.evalCtx.StmtTimestamp = mvccTS.GoTime()

// Clear out all memo records
e.rowEvalCtx.memo.prevJSON = nil
Expand Down Expand Up @@ -482,16 +483,17 @@ func (e *exprEval) evalExpr(
// if it consists of expressions supported by CDC.
// This visitor is used early to sanity check expression.
type cdcExprVisitor struct {
err error
semaCtx *tree.SemaContext
err error
}

var _ tree.Visitor = (*cdcExprVisitor)(nil)

// validateExpressionForCDC runs quick checks to make sure that expr is valid for
// CDC use case. This doesn't catch all the invalid cases, but is a place to pick up
// obviously wrong expressions.
func validateExpressionForCDC(expr tree.Expr) (tree.Expr, error) {
var v cdcExprVisitor
func validateExpressionForCDC(expr tree.Expr, semaCtx *tree.SemaContext) (tree.Expr, error) {
v := cdcExprVisitor{semaCtx: semaCtx}
expr, _ = tree.WalkExpr(&v, expr)
if v.err != nil {
return nil, v.err
Expand All @@ -508,7 +510,7 @@ func (v *cdcExprVisitor) VisitPre(expr tree.Expr) (bool, tree.Expr) {
func (v *cdcExprVisitor) VisitPost(expr tree.Expr) tree.Expr {
switch t := expr.(type) {
case *tree.FuncExpr:
fn, err := checkFunctionSupported(t)
fn, err := checkFunctionSupported(t, v.semaCtx)
if err != nil {
v.err = err
return expr
Expand Down Expand Up @@ -582,19 +584,9 @@ func (v *cdcNameResolver) VisitPost(expr tree.Expr) tree.Expr {
}
}

func resolveCustomCDCFunction(name string, fnCall *tree.FuncExpr) *tree.FuncExpr {
fn, exists := cdcFunctions[name]
if !exists {
return nil
}
return &tree.FuncExpr{
Func: tree.ResolvableFunctionReference{FunctionReference: fn},
Type: fnCall.Type,
Exprs: fnCall.Exprs,
}
}

func checkFunctionSupported(fnCall *tree.FuncExpr) (*tree.FuncExpr, error) {
func checkFunctionSupported(
fnCall *tree.FuncExpr, semaCtx *tree.SemaContext,
) (*tree.FuncExpr, error) {
var fnName string
var fnClass tree.FunctionClass
var fnVolatility volatility.V
Expand All @@ -610,23 +602,19 @@ func checkFunctionSupported(fnCall *tree.FuncExpr) (*tree.FuncExpr, error) {

switch fn := fnCall.Func.FunctionReference.(type) {
case *tree.UnresolvedName:
// We may not have function definition yet if function takes arguments,
// or it's one of the custom cdc functions.
fnName = fn.String()
props, overloads := builtins.GetBuiltinProperties(fn.String())
if props == nil {
if custom := resolveCustomCDCFunction(fnName, fnCall); custom != nil {
return custom, nil
}
funDef, err := fn.ResolveFunction(semaCtx.SearchPath)
if err != nil {
return nil, unsupportedFunctionErr()
}
fnClass = props.Class
// Pick highest volatility overload.
for _, o := range overloads {
if o.Volatility > fnVolatility {
fnVolatility = o.Volatility
}
fnCall = &tree.FuncExpr{
Func: tree.ResolvableFunctionReference{FunctionReference: funDef},
Type: fnCall.Type,
Exprs: fnCall.Exprs,
}
if _, isCDCFn := cdcFunctions[funDef.Name]; isCDCFn {
return fnCall, nil
}
return checkFunctionSupported(fnCall, semaCtx)
case *tree.FunctionDefinition:
fnName, fnClass = fn.Name, fn.Class
if fnCall.ResolvedOverload() != nil {
Expand Down Expand Up @@ -721,16 +709,21 @@ func rowEvalContextFromEvalContext(evalCtx *eval.Context) *rowEvalContext {
const rejectInvalidCDCExprs = (tree.RejectAggregates | tree.RejectGenerators |
tree.RejectWindowApplications | tree.RejectNestedGenerators)

// newSemaCtx returns new tree.SemaCtx configured for cdc.
func newSemaCtx(d *cdcevent.EventDescriptor) *tree.SemaContext {
// newSemaCtx returns new tree.SemaCtx configured for cdc without type resolver.
func newSemaCtx() *tree.SemaContext {
sema := tree.MakeSemaContext()
sema.SearchPath = &cdcCustomFunctionResolver{SearchPath: &sessiondata.DefaultSearchPath}
sema.Properties.Require("cdc", rejectInvalidCDCExprs)
return &sema
}

// newSemaCtxWithTypeResolver returns new tree.SemaCtx configured for cdc.
func newSemaCtxWithTypeResolver(d *cdcevent.EventDescriptor) *tree.SemaContext {
sema := newSemaCtx()
if d.HasUserDefinedTypes() {
sema.TypeResolver = newTypeReferenceResolver(d)
}
return &sema
return sema
}

// cdcTypeReferenceReesolver is responsible for resolving user defined types.
Expand Down
28 changes: 24 additions & 4 deletions pkg/ccl/changefeedccl/cdceval/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,36 @@ import (
// However, we can provide reasonable overrides to a small set of stable
// functions that make sense in the context of CDC.
var supportedVolatileBuiltinFunctions = makeStringSet(
// These functions can be supported given that we set the statement
// and transaction timestamp to be equal to MVCC timestamp of the event.
// TODO(yevgeniy): We also define cdc specific functions, s.a. cdc_mvcc_timestamp
// Maybe delete cdc_ overrides; or.... maybe disallow these builtins in favor of cdc_ specific overrides?
// These functions can be supported given that we set the statement and
// transaction timestamp to be equal to MVCC timestamp of the event.
"current_date",
"current_timestamp",
"localtimestamp",
"localtime",
"now",
"statement_timestamp",
"transaction_timestamp",
"timeofday",
"timezone",

// jsonb functions are stable because they depend on eval
// context DataConversionConfig
"jsonb_build_array",
"jsonb_build_object",
"to_json",
"to_jsonb",
"row_to_json",

// Misc functions that depend on eval context.
"overlaps",
"pg_collation_for",
"pg_typeof",
"quote_literal",
"quote_nullable",

// TODO(yevgeniy): Support geometry.
//"st_asgeojson",
//"st_estimatedextent",
)

// CDC Specific functions.
Expand Down
Loading