Skip to content

Commit

Permalink
Merge #54538 #54539
Browse files Browse the repository at this point in the history
54538: sql: update search_path semantics to match pg now that we support UDS r=otan a=arulajmani

Postgres semantics dictate that the default search_path should be
`$user, public`, where `$user` expands to the current user's username.
This wasn't a problem until now as we lacked support for user defined
schemas, which meant a schema by the name of `$user` wouldn't ever
exist. This however had changed now and this patch brings us in line
with the PG semantics.

Fixes #53560

Release note (sql change): The default search path for all sessions is
now `$user, public` (as opposed to just `public`). This affects our
name resolution semantics -- now, if a table is present in both the
public schema and the schema named the current user's username, an
unqualified object name will be searched/placed in the user's schema.
This doesn't impact the search semantics of tables in
pg_catalog/information_schema/temp_schema -- these continued to be
searched before checking the $user schema and the public schema.

54539: bulkio: Propagate errors when executing schedule. r=miretskiy a=miretskiy

Informs #54484

Use correct error object when checking for retryable errors.
In addition, add a `FOR UPDATE` clause when picking up
schedules to execute to reduce contention.

Release Notes: None

Release Justification: Bug fix; Incorrect handling of transaction
errors resulted in scheduled jobs showing incorrect and confusing
status message.

Co-authored-by: arulajmani <[email protected]>
Co-authored-by: Yevgeniy Miretskiy <[email protected]>
  • Loading branch information
3 people committed Sep 18, 2020
3 parents 8485afe + bd03b17 + badf836 commit 77751ac
Show file tree
Hide file tree
Showing 17 changed files with 209 additions and 59 deletions.
5 changes: 3 additions & 2 deletions pkg/jobs/job_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ SELECT
FROM %s S
WHERE next_run < %s
ORDER BY random()
%s`, env.SystemJobsTableName(), CreatedByScheduledJobs,
%s
FOR UPDATE`, env.SystemJobsTableName(), CreatedByScheduledJobs,
StatusSucceeded, StatusCanceled, StatusFailed,
env.ScheduledJobsTableName(), env.NowExpr(), limitClause)
}
Expand Down Expand Up @@ -298,7 +299,7 @@ func (s *jobScheduler) executeSchedules(
if processErr := withSavePoint(ctx, txn, func() error {
return s.processSchedule(ctx, schedule, numRunning, stats, txn)
}); processErr != nil {
if errors.HasType(err, (*savePointError)(nil)) {
if errors.HasType(processErr, (*savePointError)(nil)) {
return errors.Wrapf(err, "savepoint error for schedule %d", schedule.ScheduleID())
}

Expand Down
49 changes: 49 additions & 0 deletions pkg/jobs/job_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
Expand Down Expand Up @@ -592,3 +593,51 @@ func TestJobSchedulerDaemonUsesSystemTables(t *testing.T) {
return nil
})
}

func TestTransientTxnErrors(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

h, cleanup := newTestHelper(t)
defer cleanup()
ctx := context.Background()

h.sqlDB.Exec(t, "CREATE TABLE defaultdb.foo(a int primary key, b timestamp not null)")

// Setup 10 schedules updating defaultdb.foo timestamp.
for i := 0; i < 10; i++ {
schedule := NewScheduledJob(h.env)
schedule.SetScheduleLabel(fmt.Sprintf("test schedule: %d", i))
schedule.SetOwner("test")
require.NoError(t, schedule.SetSchedule("*/1 * * * *"))
any, err := types.MarshalAny(&jobspb.SqlStatementExecutionArg{
Statement: fmt.Sprintf("UPSERT INTO defaultdb.foo (a, b) VALUES (%d, now())", i),
})
require.NoError(t, err)
schedule.SetExecutionDetails(InlineExecutorName, jobspb.ExecutionArguments{Args: any})
require.NoError(t, schedule.Create(
ctx, h.cfg.InternalExecutor, nil))
}

// Setup numConcurrent workers, each executing maxExec executeSchedule calls.
const maxExec = 100
const numConcurrent = 3
require.NoError(t,
ctxgroup.GroupWorkers(context.Background(), numConcurrent, func(ctx context.Context, _ int) error {
ticker := time.NewTicker(time.Millisecond)
numExecs := 0
for range ticker.C {
h.env.AdvanceTime(time.Minute)
// Transaction retry errors should never bubble up.
require.NoError(t,
h.cfg.DB.Txn(context.Background(), func(ctx context.Context, txn *kv.Txn) error {
return h.execSchedules(ctx, allSchedules, txn)
}))
numExecs++
if numExecs == maxExec {
return nil
}
}
return nil
}))
}
13 changes: 6 additions & 7 deletions pkg/jobs/schedule_control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"fmt"
"strings"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/jobs/jobstest"
Expand All @@ -31,9 +32,6 @@ func TestScheduleControl(t *testing.T) {
th, cleanup := newTestHelper(t)
defer cleanup()

// Inject our test environment into schedule control execution via testing knobs.
th.cfg.TestingKnobs.(*TestingKnobs).JobSchedulerEnv = th.env

t.Run("non-existent", func(t *testing.T) {
for _, command := range []string{
"PAUSE SCHEDULE 123",
Expand Down Expand Up @@ -123,8 +121,7 @@ func TestScheduleControl(t *testing.T) {

func TestJobsControlForSchedules(t *testing.T) {
defer leaktest.AfterTest(t)()
th, cleanup := newTestHelperForTables(t, jobstest.UseSystemTables,
true /* accelerateIntervals */)
th, cleanup := newTestHelperForTables(t, jobstest.UseSystemTables)
defer cleanup()

registry := th.server.JobRegistry().(*Registry)
Expand Down Expand Up @@ -227,10 +224,12 @@ func TestJobsControlForSchedules(t *testing.T) {
func TestFilterJobsControlForSchedules(t *testing.T) {
defer leaktest.AfterTest(t)()
defer ResetConstructors()()
th, cleanup := newTestHelperForTables(t, jobstest.UseSystemTables,
false /* accelerateIntervals */)
th, cleanup := newTestHelperForTables(t, jobstest.UseSystemTables)
defer cleanup()

// Prevent registry from changing job state while running this test.
defer TestingSetAdoptAndCancelIntervals(24*time.Hour, 24*time.Hour)()

registry := th.server.JobRegistry().(*Registry)
blockResume := make(chan struct{})
defer close(blockResume)
Expand Down
37 changes: 16 additions & 21 deletions pkg/jobs/testutils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"context"
"fmt"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
Expand All @@ -33,11 +32,13 @@ import (
"github.com/stretchr/testify/require"
)

type execSchedulesFn func(ctx context.Context, maxSchedules int64, txn *kv.Txn) error
type testHelper struct {
env *jobstest.JobSchedulerTestEnv
server serverutils.TestServerInterface
cfg *scheduledjobs.JobExecutionConfig
sqlDB *sqlutils.SQLRunner
env *jobstest.JobSchedulerTestEnv
server serverutils.TestServerInterface
execSchedules execSchedulesFn
cfg *scheduledjobs.JobExecutionConfig
sqlDB *sqlutils.SQLRunner
}

// newTestHelper creates and initializes appropriate state for a test,
Expand All @@ -51,20 +52,20 @@ type testHelper struct {
// The testHelper will accelerate the adoption and cancellation loops inside of
// the registry.
func newTestHelper(t *testing.T) (*testHelper, func()) {
return newTestHelperForTables(t, jobstest.UseTestTables,
true /* accelerateIntervals */)
return newTestHelperForTables(t, jobstest.UseTestTables)
}

func newTestHelperForTables(
t *testing.T, envTableType jobstest.EnvTablesType, accelerateIntervals bool,
t *testing.T, envTableType jobstest.EnvTablesType,
) (*testHelper, func()) {
var cleanupIntervals func()
if accelerateIntervals {
cleanupIntervals = TestingSetAdoptAndCancelIntervals(10*time.Millisecond, 10*time.Millisecond)
}
var execSchedules execSchedulesFn

// Setup test scheduled jobs table.
env := jobstest.NewJobSchedulerTestEnv(envTableType, timeutil.Now())
knobs := &TestingKnobs{
TakeOverJobsScheduling: func(_ func(ctx context.Context, maxSchedules int64, txn *kv.Txn) error) {
JobSchedulerEnv: env,
TakeOverJobsScheduling: func(daemon func(ctx context.Context, maxSchedules int64, txn *kv.Txn) error) {
execSchedules = daemon
},
}
s, db, kvDB := serverutils.StartServer(t, base.TestServerArgs{
Expand All @@ -73,9 +74,6 @@ func newTestHelperForTables(

sqlDB := sqlutils.MakeSQLRunner(db)

// Setup test scheduled jobs table.
env := jobstest.NewJobSchedulerTestEnv(envTableType, timeutil.Now())

if envTableType == jobstest.UseTestTables {
sqlDB.Exec(t, jobstest.GetScheduledJobsTableSchema(env))
sqlDB.Exec(t, jobstest.GetJobsTableSchema(env))
Expand All @@ -91,12 +89,9 @@ func newTestHelperForTables(
DB: kvDB,
TestingKnobs: knobs,
},
sqlDB: sqlDB,
sqlDB: sqlDB,
execSchedules: execSchedules,
}, func() {
if cleanupIntervals != nil {
cleanupIntervals()
}

if envTableType == jobstest.UseTestTables {
sqlDB.Exec(t, "DROP TABLE "+env.SystemJobsTableName())
sqlDB.Exec(t, "DROP TABLE "+env.ScheduledJobsTableName())
Expand Down
9 changes: 1 addition & 8 deletions pkg/sql/catalog/catconstants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,7 @@

package catconstants

import (
"math"

"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
)

// DefaultSearchPath is the search path used by virgin sessions.
var DefaultSearchPath = sessiondata.MakeSearchPath([]string{"public"})
import "math"

// ReportableAppNamePrefix indicates that the application name can be
// reported in telemetry without scrubbing. (Note this only applies to
Expand Down
3 changes: 1 addition & 2 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catconstants"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/database"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
Expand Down Expand Up @@ -545,7 +544,7 @@ func (s *Server) populateMinimalSessionData(sd *sessiondata.SessionData) {
}
}
if len(sd.SearchPath.GetPathArray()) == 0 {
sd.SearchPath = catconstants.DefaultSearchPath
sd.SearchPath = sessiondata.DefaultSearchPathForUser(sd.User)
}
}

Expand Down
8 changes: 6 additions & 2 deletions pkg/sql/distsql/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,8 +284,12 @@ func (ds *ServerImpl) setupFlow(
ApplicationName: req.EvalContext.ApplicationName,
Database: req.EvalContext.Database,
User: req.EvalContext.User,
SearchPath: sessiondata.MakeSearchPath(req.EvalContext.SearchPath).WithTemporarySchemaName(req.EvalContext.TemporarySchemaName),
SequenceState: sessiondata.NewSequenceState(),
SearchPath: sessiondata.MakeSearchPath(
req.EvalContext.SearchPath,
).WithTemporarySchemaName(
req.EvalContext.TemporarySchemaName,
).WithUserSchemaName(req.EvalContext.User),
SequenceState: sessiondata.NewSequenceState(),
DataConversion: sessiondata.DataConversionConfig{
Location: location,
BytesEncodeFormat: be,
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/logictest/testdata/logic_test/discard
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ DISCARD ALL
query T
SHOW SEARCH_PATH
----
public
$user,public

query T
SET timezone = 'Europe/Amsterdam'; SHOW TIMEZONE
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/logictest/testdata/logic_test/pg_catalog
Original file line number Diff line number Diff line change
Expand Up @@ -1854,7 +1854,7 @@ reorder_joins_limit 8 NULL NUL
require_explicit_primary_keys off NULL NULL NULL string
results_buffer_size 16384 NULL NULL NULL string
row_security off NULL NULL NULL string
search_path public NULL NULL NULL string
search_path $user,public NULL NULL NULL string
serial_normalization rowid NULL NULL NULL string
server_encoding UTF8 NULL NULL NULL string
server_version 9.5.0 NULL NULL NULL string
Expand Down Expand Up @@ -1924,7 +1924,7 @@ reorder_joins_limit 8 NULL user
require_explicit_primary_keys off NULL user NULL off off
results_buffer_size 16384 NULL user NULL 16384 16384
row_security off NULL user NULL off off
search_path public NULL user NULL public public
search_path $user,public NULL user NULL $user,public $user,public
serial_normalization rowid NULL user NULL rowid rowid
server_encoding UTF8 NULL user NULL UTF8 UTF8
server_version 9.5.0 NULL user NULL 9.5.0 9.5.0
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/logictest/testdata/logic_test/reset
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ RESET SEARCH_PATH
query T
SHOW SEARCH_PATH
----
public
$user,public

statement error parameter "server_version" cannot be changed
RESET SERVER_VERSION
Expand All @@ -39,7 +39,7 @@ RESET search_path
query T
SHOW search_path
----
public
$user,public

statement ok
RESET client_encoding; RESET NAMES
Expand Down
74 changes: 74 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/schema
Original file line number Diff line number Diff line change
Expand Up @@ -459,3 +459,77 @@ CREATE TABLE new_db.public.bar()

statement ok
CREATE TABLE new_db.testuser.bar()

# cleanup the testuser schema created as part of the CREATE SCHEMA AUTHORIZATION
# command above
statement ok
DROP SCHEMA testuser CASCADE

# If a schema with a username exists, then that should be the first entry in
# the search path.
subtest user_schema_search_path

# Test setup
user root

statement ok
CREATE SCHEMA testuser

statement ok
GRANT ALL ON SCHEMA testuser TO testuser

statement ok
CREATE TABLE public.public_table(a INT)

statement ok
GRANT SELECT ON public.public_table TO testuser

user testuser

statement ok
CREATE TABLE test_table(a INT);

statement error pq: relation "public.test_table" does not exist
SELECT * FROM public.test_table

statement ok
SELECT * FROM testuser.test_table

# Only root has privs to create inside public
user root

statement ok
CREATE TABLE public.test_table(a INT, b INT)

statement ok
GRANT SELECT ON public.test_table TO testuser

user testuser

query I colnames
SELECT * FROM test_table
----
a

query II colnames
SELECT * FROM public.test_table
----
a b

query I colnames
SELECT * FROM public_table
----
a

# The search path is configured to be user specific.
user root

query II colnames
SELECT * FROM test_table
----
a b

query I colnames
SELECT * FROM testuser.test_table
----
a
2 changes: 1 addition & 1 deletion pkg/sql/logictest/testdata/logic_test/show_source
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ reorder_joins_limit 8
require_explicit_primary_keys off
results_buffer_size 16384
row_security off
search_path public
search_path $user,public
serial_normalization rowid
server_encoding UTF8
server_version 9.5.0
Expand Down
3 changes: 1 addition & 2 deletions pkg/sql/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catconstants"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/lease"
Expand Down Expand Up @@ -263,7 +262,7 @@ func newInternalPlanner(
ctx := logtags.AddTag(context.Background(), opName, "")

sd := &sessiondata.SessionData{
SearchPath: catconstants.DefaultSearchPath,
SearchPath: sessiondata.DefaultSearchPathForUser(user),
User: user,
Database: "system",
SequenceState: sessiondata.NewSequenceState(),
Expand Down
Loading

0 comments on commit 77751ac

Please sign in to comment.