diff --git a/pkg/ccl/kvccl/kvfollowerreadsccl/boundedstaleness_test.go b/pkg/ccl/kvccl/kvfollowerreadsccl/boundedstaleness_test.go index 947d06bc3141..3119637a1aea 100644 --- a/pkg/ccl/kvccl/kvfollowerreadsccl/boundedstaleness_test.go +++ b/pkg/ccl/kvccl/kvfollowerreadsccl/boundedstaleness_test.go @@ -55,16 +55,31 @@ func TestBoundedStalenessEnterpriseLicense(t *testing.T) { tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{}) defer tc.Stopper().Stop(ctx) - queries := []string{ - `SELECT with_max_staleness('10s')`, - `SELECT with_min_timestamp(statement_timestamp())`, + testCases := []struct { + query string + args []interface{} + }{ + { + query: `SELECT with_max_staleness('10s')`, + }, + { + query: `SELECT with_min_timestamp(statement_timestamp())`, + }, + { + query: `SELECT $1::TEXT FROM generate_series(1,1) AS OF SYSTEM TIME with_max_staleness('1s')`, + args: []interface{}{"cat"}, + }, + { + query: `SELECT $1::TEXT FROM generate_series(1,1) AS OF SYSTEM TIME with_min_timestamp(statement_timestamp())`, + args: []interface{}{"cat"}, + }, } defer utilccl.TestingDisableEnterprise()() t.Run("disabled", func(t *testing.T) { - for _, q := range queries { - t.Run(q, func(t *testing.T) { - _, err := tc.Conns[0].QueryContext(ctx, q) + for _, testCase := range testCases { + t.Run(testCase.query, func(t *testing.T) { + _, err := tc.Conns[0].QueryContext(ctx, testCase.query, testCase.args...) require.Error(t, err) require.Contains(t, err.Error(), "use of bounded staleness requires an enterprise license") }) @@ -73,9 +88,9 @@ func TestBoundedStalenessEnterpriseLicense(t *testing.T) { t.Run("enabled", func(t *testing.T) { defer utilccl.TestingEnableEnterprise()() - for _, q := range queries { - t.Run(q, func(t *testing.T) { - r, err := tc.Conns[0].QueryContext(ctx, q) + for _, testCase := range testCases { + t.Run(testCase.query, func(t *testing.T) { + r, err := tc.Conns[0].QueryContext(ctx, testCase.query, testCase.args...) require.NoError(t, err) require.NoError(t, r.Close()) }) diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go index c73504d05659..489b5185919f 100644 --- a/pkg/sql/conn_executor_exec.go +++ b/pkg/sql/conn_executor_exec.go @@ -756,38 +756,42 @@ func (ex *connExecutor) handleAOST(ctx context.Context, stmt tree.Statement) err return err } } - } else if p.extendedEvalCtx.AsOfSystemTime.BoundedStaleness { - // This has to be a bounded staleness read with nearest_only=True during - // a retry. The AOST read timestamps are expected to differ. - if p.extendedEvalCtx.AsOfSystemTime.MaxTimestampBound.IsEmpty() { - return errors.AssertionFailedf( - "expected bounded_staleness set with a max_timestamp_bound", - ) - } - } else if *p.extendedEvalCtx.AsOfSystemTime != *asOf { - return errors.AssertionFailedf( - "cannot specify AS OF SYSTEM TIME with different timestamps", - ) + return nil } - } else { - // If we're in an explicit txn, we allow AOST but only if it matches with - // the transaction's timestamp. This is useful for running AOST statements - // using the InternalExecutor inside an external transaction; one might want - // to do that to force p.avoidLeasedDescriptors to be set below. - if asOf.BoundedStaleness { - return pgerror.Newf( - pgcode.FeatureNotSupported, - "cannot use a bounded staleness query in a transaction", - ) + if *p.extendedEvalCtx.AsOfSystemTime == *asOf { + // In most cases, the AOST timestamps are expected to match. + return nil } - if readTs := ex.state.getReadTimestamp(); asOf.Timestamp != readTs { - err = pgerror.Newf(pgcode.Syntax, - "inconsistent AS OF SYSTEM TIME timestamp; expected: %s", readTs) - err = errors.WithHint(err, "try SET TRANSACTION AS OF SYSTEM TIME") - return err + if p.extendedEvalCtx.AsOfSystemTime.BoundedStaleness { + if !p.extendedEvalCtx.AsOfSystemTime.MaxTimestampBound.IsEmpty() { + // This has to be a bounded staleness read with nearest_only=True during + // a retry. The AOST read timestamps are expected to differ. + return nil + } + return errors.AssertionFailedf("expected bounded_staleness set with a max_timestamp_bound") } - p.extendedEvalCtx.AsOfSystemTime = asOf + return errors.AssertionFailedf( + "cannot specify AS OF SYSTEM TIME with different timestamps. expected: %s, got: %s", + p.extendedEvalCtx.AsOfSystemTime.Timestamp, asOf.Timestamp, + ) + } + // If we're in an explicit txn, we allow AOST but only if it matches with + // the transaction's timestamp. This is useful for running AOST statements + // using the InternalExecutor inside an external transaction; one might want + // to do that to force p.avoidLeasedDescriptors to be set below. + if asOf.BoundedStaleness { + return pgerror.Newf( + pgcode.FeatureNotSupported, + "cannot use a bounded staleness query in a transaction", + ) + } + if readTs := ex.state.getReadTimestamp(); asOf.Timestamp != readTs { + err = pgerror.Newf(pgcode.Syntax, + "inconsistent AS OF SYSTEM TIME timestamp; expected: %s, got: %s", readTs, asOf.Timestamp) + err = errors.WithHint(err, "try SET TRANSACTION AS OF SYSTEM TIME") + return err } + p.extendedEvalCtx.AsOfSystemTime = asOf return nil }