Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…83526 #83533 #83548 #83559 #83576 #83588

57838: execinfra: remove MetadataTest* processors r=yuzefovich a=yuzefovich

This commit removes `MetadataTestSender` and `MetadataTestReceiver`
processors since they no longer provide much value. I believe they were
introduced when we added a `ProducerMetadata` as a return parameter to
`Next` method in order to ensure that at least some artificial metadata
is propagated correctly throughout the whole flow.

The main goal of this commit is the removal of `fakedist-metadata` and
`5node-metadata` logic test configs in order to speed up the CI time.

The justification for removal of these processors without putting anything
in their place is that these processors are not that useful - the only
thing they can test is that *some* metadata is propagated through the
row-based flows. Note that they don't test whether all necessary
metadata is emitted (for example, whether `LeafTxnFinalState`). We've
been using the vectorized engine as the default for a while now, and these
processors don't get planned with the vectorized flows. Thus, it seems
silly to have a logic test config like `fakedist-metadata` that is part
of the default configs.

Addresses: #57268.

Release note: None

78104: kvserver: include MVCC range keys in replica consistency checks r=aliher1911 a=erikgrinaker

This patch adds handling of MVCC range keys in replica consistency
checks. These are iterated over as part of `MVCCStats` calculations and
hashed similarly to point keys.

Range keys will only exist after the version gate
`ExperimentalMVCCRangeTombstones` has been enabled, so a separate
version gate is not necessary.

Release note: None

81880: changefeedccl: fix changefeed telemetry resolved r=samiskin a=samiskin

Resolves #81599

Our internal-use changefeed create / failed telemetry events would
incorrectly record "yes" for any non-no value of resolved, rather than
using the value that was passed in.  This change resolves that, emitting
yes for "resolved" and the value itself for "resolved=<value>".

Release note: None

82686: sql: support ttl_expiration_expression for row-level TTL r=otan a=ecwall

refs #76916

Release note (sql change): Allow `CREATE TABLE ... WITH (ttl_expiration_expression='...')`.
Allow `ALTER TABLE ... SET (ttl_expiration_expression='...')` and
`ALTER TABLE ... RESET (ttl_expiration_expression)`. ttl_expiration_expression accepts
an expression that returns a timestamp to support custom TTL calculation.

82885: streamingccl: minor error style cleanups r=miretskiy,HonoreDB a=stevendanna

Capitalized error messages are rare in the code base, so I've made
these more consistent with the rest of the code base.

Release note: None

83004: roachprod: add prometheus/grafana monitoring r=irfansharif a=msbutler

Previously, only roachtests could spin up prom/grafana servers that lasted the
lifetime of the roachtest. This PR introduces new roachprod cmds that allow
a roachprod user to easily spin up/down their own prom/grafana instances. The PR
also hooks up roachtests that rely on prom/grafana into this new infrastructure.

Release note: none

83027: sql: add plan gist to sampled query telemetry log r=THardy98 a=THardy98

Partially resolves: #71328

This change adds a plan gist field to the sampled query telemetry log.
The plan gist is written as a base64 encoded string.

Release note (sql change): The sampled query telemetry log now includes
a plan gist field. The plan gist field provides a compact representation
of a logical plan for the sampled query, the field is written as a
base64 encoded string.

83445: roachtest: skip decommissionBench/nodes=8/cpu=16/warehouses=3000 r=erikgrinaker a=tbg

#82870

Release note: None


83505: streamingccl: deflake TestPartitionedStreamReplicationClient r=miretskiy a=stevendanna

Previously, this test would fail occasionally with:

```
    partitioned_stream_client_test.go:192:
                Error Trace:    partitioned_stream_client_test.go:192
                Error:          Target error should be in err chain:
                                expected: "context canceled"
                                in chain: "pq: query execution canceled"
                Test:           TestPartitionedStreamReplicationClient
```

This is a result of the lib/pq library asyncronously sending a
CancelRequest when it notices the context cancellation. The cancel
request may result in the query ending before the local context
cancellation produces an error.

We now count this query cancellation error as an acceptable response
since the goal of the test is to assert that we respond to context
cancellation.

Fixes #76919

Release note: None

83526: backupccl: create tree.SystemUsers, a new DescriptorCoverage enum r=adityamaru a=msbutler

Previously during planning and execution RESTORE SYSTEM USERS was identified by
a `jobDetails` field. This refactor now identifies this flavor of
restore with a new  DescriptorCoverage enum value, `tree.SystemUsers.

This refactor eases the logic around exposing extra processing steps for
flavors of backup/restore that target different sets of descriptors.

Release note: None

83533: amazon: add custom retryer to retry on `read: connection reset` r=miretskiy,stevendanna a=adityamaru

This change implements a custom retryer that we use when
initializing a new s3 client for interaction with the external
storage sink. This change was motivated by the increased number
of backup job failures we were observing with a `read: connection reset`
error being thrown by s3.

A read connection reset error is thrown when the SDK is unable to read
the response of an underlying API request due to a connection reset. The
DefaultRetryer in the AWS SDK does not treat this error as a retryable error
since the SDK does not have knowledge about the idempotence of the request,
and whether it is safe to retry -
aws/aws-sdk-go#2926 (comment).
In CRDB all operations with s3 (read, write, list) are considered idempotent,
and so we can treat the read connection reset error as retryable too.

Release note (bug fix): Retry s3 operations when they error out with a
read connection reset error instead of failing the top level job.

83548: changefeedccl: Support more stable functions. r=miretskiy a=miretskiy

Add support for additional stable functions to CDC expressions.

Fixes #83466

Release Notes: None

83559: sql: ensure that the plan is closed in apply joins in some error cases r=yuzefovich a=yuzefovich

Previously, it was possible for the apply join's plan to be left
unclosed when an error is encountered during the physical planning of
the main query, and this has now been fixed. We do so by explicitly
closing the plan in such a scenario.

Fixes: #82705.
Fixes: #83368.

Release note: None

83576: streamingccl: re-enabled TestRandomClientGeneration r=miretskiy a=stevendanna

TestRandomClientGeneration was skipped in #61292 as a flake. However,
in the time since then, other changes in this code broke this test
more completely. Re-enabling the test required a few unrelated
changes:

- The stream ingestion processor required a fully formed job to be
  able to poll the cutover time. Now, test code can set a
  cutoverProvider that doesn't depend on a full job record.

- The request intercepting depended on an explicit client being
  set. This test was rather passing the processor a randgen URI. Now we
  pass the client explicitly and also update the test code to make it
  clear that the stream URI isn't actually used for anything.

- The code was attempting to validate the number of rows using SQL. I
  haven't dug into how this was working in the past. But as we are
  connecting to the host tenant and the keys are being ingested to a
  guest tenant, we would need a connection to the guest tenant to
  validate the table data. I've simply removed this assertion since I
  don't think it was testing very much compared to the KV level
  assertions also used in the test.

- The test code assumed that the partitions were keyed based on the
  subscription token rather than the subscription ID.

It isn't clear what the original source of the flakiness was.
However, the test has run a few hundred times under stress without
issue.

Alternatively, we could just delete this test.

Fixes #61287

Release note: None

83588: tree: DROP ROLE should not implement CCLOnlyStatement r=rafiss a=rafiss

This was moved out of CCL licensing a few releases ago.

Release note: None

Co-authored-by: Yahor Yuzefovich <[email protected]>
Co-authored-by: Erik Grinaker <[email protected]>
Co-authored-by: Shiranka Miskin <[email protected]>
Co-authored-by: Evan Wall <[email protected]>
Co-authored-by: Steven Danna <[email protected]>
Co-authored-by: Michael Butler <[email protected]>
Co-authored-by: Thomas Hardy <[email protected]>
Co-authored-by: Tobias Grieger <[email protected]>
Co-authored-by: Aditya Maru <[email protected]>
Co-authored-by: Yevgeniy Miretskiy <[email protected]>
Co-authored-by: Rafi Shamim <[email protected]>
  • Loading branch information
12 people committed Jun 29, 2022
Show file tree
Hide file tree
Showing 97 changed files with 2,351 additions and 1,457 deletions.
1 change: 1 addition & 0 deletions docs/generated/eventlog.md
Original file line number Diff line number Diff line change
Expand Up @@ -2458,6 +2458,7 @@ contains common SQL event/execution details.
| `SkippedQueries` | skipped_queries indicate how many SQL statements were not considered for sampling prior to this one. If the field is omitted, or its value is zero, this indicates that no statement was omitted since the last event. | no |
| `CostEstimate` | Cost of the query as estimated by the optimizer. | no |
| `Distribution` | The distribution of the DistSQL query plan (local, full, or partial). | no |
| `PlanGist` | The query's plan gist bytes as a base64 encoded string. | no |


#### Common fields
Expand Down
14 changes: 12 additions & 2 deletions pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -1480,7 +1480,7 @@ func (r *restoreResumer) doResume(ctx context.Context, execCtx interface{}) erro
if err := r.cleanupTempSystemTables(ctx, nil /* txn */); err != nil {
return err
}
} else if details.RestoreSystemUsers {
} else if isSystemUserRestore(details) {
if err := r.restoreSystemUsers(ctx, p.ExecCfg().DB, mainData.systemTables); err != nil {
return err
}
Expand Down Expand Up @@ -1531,6 +1531,16 @@ func (r *restoreResumer) doResume(ctx context.Context, execCtx interface{}) erro
return nil
}

// isSystemUserRestore checks if the user called RESTORE SYSTEM USERS and guards
// against any mixed version issues. In 22.2, details.DescriptorCoverage
// identifies a system user restore, while in 22.1, details.RestoreSystemUsers
// identified this flavour of restore.
//
// TODO(msbutler): delete in 23.1
func isSystemUserRestore(details jobspb.RestoreDetails) bool {
return details.DescriptorCoverage == tree.SystemUsers || details.RestoreSystemUsers
}

func revalidateIndexes(
ctx context.Context,
execCfg *sql.ExecutorConfig,
Expand Down Expand Up @@ -1647,7 +1657,7 @@ func (r *restoreResumer) notifyStatsRefresherOfNewTables() {
// This is the last of the IDs pre-allocated by the restore planner.
// TODO(postamar): Store it directly in the details instead? This is brittle.
func tempSystemDatabaseID(details jobspb.RestoreDetails) descpb.ID {
if details.DescriptorCoverage != tree.AllDescriptors && !details.RestoreSystemUsers {
if details.DescriptorCoverage != tree.AllDescriptors && !isSystemUserRestore(details) {
return descpb.InvalidID
}
var maxPreAllocatedID descpb.ID
Expand Down
29 changes: 17 additions & 12 deletions pkg/ccl/backupccl/restore_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,6 @@ func allocateDescriptorRewrites(
opts tree.RestoreOptions,
intoDB string,
newDBName string,
restoreSystemUsers bool,
) (jobspb.DescRewriteMap, error) {
descriptorRewrites := make(jobspb.DescRewriteMap)

Expand Down Expand Up @@ -292,7 +291,7 @@ func allocateDescriptorRewrites(
// in the backup and current max desc ID in the restoring cluster. This generator
// keeps produced the next descriptor ID.
var tempSysDBID descpb.ID
if descriptorCoverage == tree.AllDescriptors || restoreSystemUsers {
if descriptorCoverage == tree.AllDescriptors || descriptorCoverage == tree.SystemUsers {
var err error
if descriptorCoverage == tree.AllDescriptors {
// Restore the key which generates descriptor IDs.
Expand Down Expand Up @@ -323,7 +322,7 @@ func allocateDescriptorRewrites(
if err != nil {
return nil, err
}
} else if restoreSystemUsers {
} else if descriptorCoverage == tree.SystemUsers {
tempSysDBID, err = descidgen.GenerateUniqueDescID(ctx, p.ExecCfg().DB, p.ExecCfg().Codec)
if err != nil {
return nil, err
Expand Down Expand Up @@ -706,7 +705,7 @@ func allocateDescriptorRewrites(
// backup should have the same ID as they do in the backup.
descriptorsToRemap := make([]catalog.Descriptor, 0, len(tablesByID))
for _, table := range tablesByID {
if descriptorCoverage == tree.AllDescriptors || restoreSystemUsers {
if descriptorCoverage == tree.AllDescriptors || descriptorCoverage == tree.SystemUsers {
if table.ParentID == systemschema.SystemDB.GetID() {
// This is a system table that should be marked for descriptor creation.
descriptorsToRemap = append(descriptorsToRemap, table)
Expand Down Expand Up @@ -988,7 +987,6 @@ func restoreJobDescription(
kmsURIs []string,
) (string, error) {
r := &tree.Restore{
SystemUsers: restore.SystemUsers,
DescriptorCoverage: restore.DescriptorCoverage,
AsOf: restore.AsOf,
Targets: restore.Targets,
Expand Down Expand Up @@ -1065,7 +1063,7 @@ func restorePlanHook(

var intoDBFn func() (string, error)
if restoreStmt.Options.IntoDB != nil {
if restoreStmt.SystemUsers {
if restoreStmt.DescriptorCoverage == tree.SystemUsers {
return nil, nil, nil, false, errors.New("cannot set into_db option when only restoring system users")
}
intoDBFn, err = p.TypeAsString(ctx, restoreStmt.Options.IntoDB, "RESTORE")
Expand Down Expand Up @@ -1105,12 +1103,13 @@ func restorePlanHook(

var newDBNameFn func() (string, error)
if restoreStmt.Options.NewDBName != nil {
if restoreStmt.DescriptorCoverage == tree.AllDescriptors || len(restoreStmt.Targets.Databases) != 1 {
if restoreStmt.DescriptorCoverage == tree.AllDescriptors ||
len(restoreStmt.Targets.Databases) != 1 {
err = errors.New("new_db_name can only be used for RESTORE DATABASE with a single target" +
" database")
return nil, nil, nil, false, err
}
if restoreStmt.SystemUsers {
if restoreStmt.DescriptorCoverage == tree.SystemUsers {
return nil, nil, nil, false, errors.New("cannot set new_db_name option when only restoring system users")
}
newDBNameFn, err = p.TypeAsString(ctx, restoreStmt.Options.NewDBName, "RESTORE")
Expand Down Expand Up @@ -1581,7 +1580,7 @@ func doRestorePlan(
}

sqlDescs, restoreDBs, tenants, err := selectTargets(
ctx, p, mainBackupManifests, restoreStmt.Targets, restoreStmt.DescriptorCoverage, endTime, restoreStmt.SystemUsers,
ctx, p, mainBackupManifests, restoreStmt.Targets, restoreStmt.DescriptorCoverage, endTime,
)
if err != nil {
return errors.Wrap(err,
Expand Down Expand Up @@ -1734,8 +1733,7 @@ func doRestorePlan(
restoreStmt.DescriptorCoverage,
restoreStmt.Options,
intoDB,
newDBName,
restoreStmt.SystemUsers)
newDBName)
if err != nil {
return err
}
Expand Down Expand Up @@ -1835,7 +1833,14 @@ func doRestorePlan(
RevalidateIndexes: revalidateIndexes,
DatabaseModifiers: databaseModifiers,
DebugPauseOn: debugPauseOn,
RestoreSystemUsers: restoreStmt.SystemUsers,

// A RESTORE SYSTEM USERS planned on a 22.1 node will use the
// RestoreSystemUsers field in the job details to identify this flavour of
// RESTORE. We must continue to check this field for mixed-version
// compatability.
//
// TODO(msbutler): Delete in 23.1
RestoreSystemUsers: restoreStmt.DescriptorCoverage == tree.SystemUsers,
PreRewriteTenantId: oldTenantID,
Validation: jobspb.RestoreValidation_DefaultRestore,
},
Expand Down
3 changes: 1 addition & 2 deletions pkg/ccl/backupccl/targets.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,15 +339,14 @@ func selectTargets(
targets tree.TargetList,
descriptorCoverage tree.DescriptorCoverage,
asOf hlc.Timestamp,
restoreSystemUsers bool,
) ([]catalog.Descriptor, []catalog.DatabaseDescriptor, []descpb.TenantInfoWithUsage, error) {
allDescs, lastBackupManifest := backupinfo.LoadSQLDescsFromBackupsAtTime(backupManifests, asOf)

if descriptorCoverage == tree.AllDescriptors {
return fullClusterTargetsRestore(allDescs, lastBackupManifest)
}

if restoreSystemUsers {
if descriptorCoverage == tree.SystemUsers {
systemTables := make([]catalog.Descriptor, 0)
var users catalog.Descriptor
for _, desc := range allDescs {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/cdceval/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ go_library(
"//pkg/sql/parser",
"//pkg/sql/pgwire/pgcode",
"//pkg/sql/pgwire/pgerror",
"//pkg/sql/sem/builtins",
"//pkg/sql/sem/eval",
"//pkg/sql/sem/normalize",
"//pkg/sql/sem/tree",
Expand Down Expand Up @@ -82,6 +81,7 @@ go_test(
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/randutil",
"//pkg/util/timeofday",
"@com_github_stretchr_testify//require",
],
)
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/cdceval/constraint.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func constrainSpansBySelectClause(
}

tableName := tableNameOrAlias(ed.TableName, selectClause.From.Tables[0])
semaCtx := newSemaCtx(ed)
semaCtx := newSemaCtxWithTypeResolver(ed)
return sc.ConstrainPrimaryIndexSpanByExpr(
ctx, sql.BestEffortConstrain, tableName, ed.TableDescriptor(),
evalCtx, semaCtx, selectClause.Where.Expr)
Expand Down
69 changes: 31 additions & 38 deletions pkg/ccl/changefeedccl/cdceval/expr_eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/schemaexpr"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/sem/builtins"
"github.com/cockroachdb/cockroach/pkg/sql/sem/eval"
"github.com/cockroachdb/cockroach/pkg/sql/sem/normalize"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
Expand Down Expand Up @@ -106,17 +105,18 @@ func (e *Evaluator) initSelectClause(sc *tree.SelectClause) error {
"expected at least 1 projection")
}

semaCtx := newSemaCtx()
e.selectors = sc.Exprs
for _, se := range e.selectors {
expr, err := validateExpressionForCDC(se.Expr)
expr, err := validateExpressionForCDC(se.Expr, semaCtx)
if err != nil {
return err
}
se.Expr = expr
}

if sc.Where != nil {
expr, err := validateExpressionForCDC(sc.Where.Expr)
expr, err := validateExpressionForCDC(sc.Where.Expr, semaCtx)
if err != nil {
return err
}
Expand Down Expand Up @@ -207,7 +207,7 @@ func newExprEval(
cols := ed.ResultColumns()
e := &exprEval{
EventDescriptor: ed,
semaCtx: newSemaCtx(ed),
semaCtx: newSemaCtxWithTypeResolver(ed),
evalCtx: evalCtx.Copy(),
evalHelper: &rowContainer{cols: cols},
projection: cdcevent.MakeProjection(ed),
Expand Down Expand Up @@ -268,6 +268,7 @@ func (e *exprEval) setupContext(
e.rowEvalCtx.prevRow = prevRow
e.rowEvalCtx.mvccTS = mvccTS
e.evalCtx.TxnTimestamp = mvccTS.GoTime()
e.evalCtx.StmtTimestamp = mvccTS.GoTime()

// Clear out all memo records
e.rowEvalCtx.memo.prevJSON = nil
Expand Down Expand Up @@ -482,16 +483,17 @@ func (e *exprEval) evalExpr(
// if it consists of expressions supported by CDC.
// This visitor is used early to sanity check expression.
type cdcExprVisitor struct {
err error
semaCtx *tree.SemaContext
err error
}

var _ tree.Visitor = (*cdcExprVisitor)(nil)

// validateExpressionForCDC runs quick checks to make sure that expr is valid for
// CDC use case. This doesn't catch all the invalid cases, but is a place to pick up
// obviously wrong expressions.
func validateExpressionForCDC(expr tree.Expr) (tree.Expr, error) {
var v cdcExprVisitor
func validateExpressionForCDC(expr tree.Expr, semaCtx *tree.SemaContext) (tree.Expr, error) {
v := cdcExprVisitor{semaCtx: semaCtx}
expr, _ = tree.WalkExpr(&v, expr)
if v.err != nil {
return nil, v.err
Expand All @@ -508,7 +510,7 @@ func (v *cdcExprVisitor) VisitPre(expr tree.Expr) (bool, tree.Expr) {
func (v *cdcExprVisitor) VisitPost(expr tree.Expr) tree.Expr {
switch t := expr.(type) {
case *tree.FuncExpr:
fn, err := checkFunctionSupported(t)
fn, err := checkFunctionSupported(t, v.semaCtx)
if err != nil {
v.err = err
return expr
Expand Down Expand Up @@ -582,19 +584,9 @@ func (v *cdcNameResolver) VisitPost(expr tree.Expr) tree.Expr {
}
}

func resolveCustomCDCFunction(name string, fnCall *tree.FuncExpr) *tree.FuncExpr {
fn, exists := cdcFunctions[name]
if !exists {
return nil
}
return &tree.FuncExpr{
Func: tree.ResolvableFunctionReference{FunctionReference: fn},
Type: fnCall.Type,
Exprs: fnCall.Exprs,
}
}

func checkFunctionSupported(fnCall *tree.FuncExpr) (*tree.FuncExpr, error) {
func checkFunctionSupported(
fnCall *tree.FuncExpr, semaCtx *tree.SemaContext,
) (*tree.FuncExpr, error) {
var fnName string
var fnClass tree.FunctionClass
var fnVolatility volatility.V
Expand All @@ -610,23 +602,19 @@ func checkFunctionSupported(fnCall *tree.FuncExpr) (*tree.FuncExpr, error) {

switch fn := fnCall.Func.FunctionReference.(type) {
case *tree.UnresolvedName:
// We may not have function definition yet if function takes arguments,
// or it's one of the custom cdc functions.
fnName = fn.String()
props, overloads := builtins.GetBuiltinProperties(fn.String())
if props == nil {
if custom := resolveCustomCDCFunction(fnName, fnCall); custom != nil {
return custom, nil
}
funDef, err := fn.ResolveFunction(semaCtx.SearchPath)
if err != nil {
return nil, unsupportedFunctionErr()
}
fnClass = props.Class
// Pick highest volatility overload.
for _, o := range overloads {
if o.Volatility > fnVolatility {
fnVolatility = o.Volatility
}
fnCall = &tree.FuncExpr{
Func: tree.ResolvableFunctionReference{FunctionReference: funDef},
Type: fnCall.Type,
Exprs: fnCall.Exprs,
}
if _, isCDCFn := cdcFunctions[funDef.Name]; isCDCFn {
return fnCall, nil
}
return checkFunctionSupported(fnCall, semaCtx)
case *tree.FunctionDefinition:
fnName, fnClass = fn.Name, fn.Class
if fnCall.ResolvedOverload() != nil {
Expand Down Expand Up @@ -721,16 +709,21 @@ func rowEvalContextFromEvalContext(evalCtx *eval.Context) *rowEvalContext {
const rejectInvalidCDCExprs = (tree.RejectAggregates | tree.RejectGenerators |
tree.RejectWindowApplications | tree.RejectNestedGenerators)

// newSemaCtx returns new tree.SemaCtx configured for cdc.
func newSemaCtx(d *cdcevent.EventDescriptor) *tree.SemaContext {
// newSemaCtx returns new tree.SemaCtx configured for cdc without type resolver.
func newSemaCtx() *tree.SemaContext {
sema := tree.MakeSemaContext()
sema.SearchPath = &cdcCustomFunctionResolver{SearchPath: &sessiondata.DefaultSearchPath}
sema.Properties.Require("cdc", rejectInvalidCDCExprs)
return &sema
}

// newSemaCtxWithTypeResolver returns new tree.SemaCtx configured for cdc.
func newSemaCtxWithTypeResolver(d *cdcevent.EventDescriptor) *tree.SemaContext {
sema := newSemaCtx()
if d.HasUserDefinedTypes() {
sema.TypeResolver = newTypeReferenceResolver(d)
}
return &sema
return sema
}

// cdcTypeReferenceReesolver is responsible for resolving user defined types.
Expand Down
28 changes: 24 additions & 4 deletions pkg/ccl/changefeedccl/cdceval/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,36 @@ import (
// However, we can provide reasonable overrides to a small set of stable
// functions that make sense in the context of CDC.
var supportedVolatileBuiltinFunctions = makeStringSet(
// These functions can be supported given that we set the statement
// and transaction timestamp to be equal to MVCC timestamp of the event.
// TODO(yevgeniy): We also define cdc specific functions, s.a. cdc_mvcc_timestamp
// Maybe delete cdc_ overrides; or.... maybe disallow these builtins in favor of cdc_ specific overrides?
// These functions can be supported given that we set the statement and
// transaction timestamp to be equal to MVCC timestamp of the event.
"current_date",
"current_timestamp",
"localtimestamp",
"localtime",
"now",
"statement_timestamp",
"transaction_timestamp",
"timeofday",
"timezone",

// jsonb functions are stable because they depend on eval
// context DataConversionConfig
"jsonb_build_array",
"jsonb_build_object",
"to_json",
"to_jsonb",
"row_to_json",

// Misc functions that depend on eval context.
"overlaps",
"pg_collation_for",
"pg_typeof",
"quote_literal",
"quote_nullable",

// TODO(yevgeniy): Support geometry.
//"st_asgeojson",
//"st_estimatedextent",
)

// CDC Specific functions.
Expand Down
Loading

0 comments on commit 0917fdc

Please sign in to comment.