Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

jobs: Rewrite job scheduler query logic. #78564

Merged
merged 1 commit into from
Mar 31, 2022

Conversation

miretskiy
Copy link
Contributor

Prior to this change, job scheduler would lookup a set of
schedules to execute, and it would lock each schedule via FOR UPDATE
clause. The query was a complext query that also performed joins
on system.job table. This resulted in a larger read set of rows
being locked, and it made transaction restarts more expensive.

This PR modifies the querying logic so that the scheduler first
obtains a set of potential schedules to execute. Then, each schedule
executes under its own transaction, where only a single schedule is
locked for update (to guarantee only one scheduler executes this schedule).

Release Notes (enterprise change): Job scheduler is more efficient
and should no longer lock-up jobs and scheduled jobs tables.

Release Justification: Stability improvement for scheduled jobs system.

@cockroach-teamcity
Copy link
Member

This change is Reviewable

@nvanbenschoten
Copy link
Member

@miretskiy It would be very helpful if we included the EXPLAIN plan in a comment above each SQL query. As we've seen, the potential for transaction contention is closely related to how constrained the underlying index scans are on these tables.

@miretskiy
Copy link
Contributor Author

Good idea, @nvanbenschoten . Will do once I rebase/cleanup and get this ready for review.

@miretskiy miretskiy force-pushed the scheduler_rewrite branch 2 times, most recently from 74882db to f131804 Compare March 29, 2022 12:04
@miretskiy
Copy link
Contributor Author

miretskiy commented Mar 29, 2022

executeSchedules runs a simple query now under nil txn; the plan uses correct next_run index:

[email protected]:26257/movr> explain SELECT schedule_id FROM system.scheduled_jobs WHERE next_run < now() ORDER BY random() LIMIT 10;
                               info
------------------------------------------------------------------
  distribution: local
  vectorized: true

  • top-k
  │ order: +column13
  │ k: 10
  │
  └── • render
      │
      └── • scan
            missing stats
            table: scheduled_jobs@next_run_idx
            spans: (/NULL - /'2022-03-29 12:07:30.689322+00:00']
(13 rows)

After that query, for each candidate row, we execute:

[email protected]:26257/movr> explain SELECT * FROM system.scheduled_jobs WHERE schedule_id=123 AND next_run < now() FOR UPDATE;
                                                                                               info
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
  distribution: local
  vectorized: true

  • filter
  │ filter: next_run < '2022-03-29 12:09:36.471592+00:00'
  │
  └── • scan
        missing stats
        table: scheduled_jobs@primary
        spans: [/123 - /123]
        locking strength: for update

  index recommendations: 1
  1. type: index creation
     SQL command: CREATE INDEX ON scheduled_jobs (schedule_id, next_run) STORING (schedule_name, created, owner, schedule_state, schedule_expr, schedule_details, executor_type, execution_args);
(15 rows)

I'm not sure why it recommends creating index on schedule_id, next_run -- it's still a point lookup. I could drop "WHERE next_run<now()" and just parse/compare against current time. Not sure it's worth it. This is done under txn.

Once we get non-nil row from above (i.e. schedule ready to run), we count how many running jobs, using nil txn.

[email protected]:26257/movr> explain SELECT count(*) FROM system.jobs WHERE created_by_type = 'xxx' AND created_by_id = 123 AND status IN ('blah', 'blah');
                                           info
-------------------------------------------------------------------------------------------
  distribution: local
  vectorized: true

  • group (scalar)
  │
  └── • filter
      │ filter: status = 'blah'
      │
      └── • scan
            missing stats
            table: jobs@jobs_created_by_type_created_by_id_idx
            spans: [/'xxx'/123 - /'xxx'/123]

  index recommendations: 1
  1. type: index creation
     SQL command: CREATE INDEX ON jobs (status) STORING (created_by_type, created_by_id);
(16 rows)

Again, not sure why we get index creation suggestion jobs@jobs_created_by_type_created_by_id_idx stores status... so, why?

Subsequent logic runs under txn used to lock a single row for update.
Savepoint
ProcessSchedule
Rollback or commit based on ProcessSchedule result

We are using savepoint so that failures in schedule execution can be rollbacked, and those failures can be handled
as specified by schedule policy (reschedule, retry, etc). I think that's fine.

The big change, of course, is executing the first query w/out any explicit txn and w/out for update. Also, look up against jobs table doesn't use txn -- thus no increase in read set. Just 1 row ought to be locked for update. Of course, execute schedule could do something silly (like read entire jobs table under txn) -- but we have separate issue for that.

@ajwerner
Copy link
Contributor

I think you meant to tag @nvanbenschoten

@miretskiy
Copy link
Contributor Author

I think you meant to tag @nvanbenschoten

Yeah... Completion failed me.

Prior to this change, job scheduler would lookup a set of
schedules to execute, and it would lock each schedule via `FOR UPDATE`
clause.  The query was a complext query that also performed joins
on `system.job` table.  This resulted in a larger read set of rows
being locked, and it made transaction restarts more expensive.

This PR modifies the querying logic so that the scheduler first
obtains a set of potential schedules to execute.  Then, each schedule
executes under its own transaction, where only a single schedule is
locked for update (to guarantee only one scheduler executes this schedule).

Release Notes (enterprise change): Job scheduler is more efficient
and should no longer lock-up jobs and scheduled jobs tables.

Release Justification: Stability improvement for scheduled jobs system.

s
@miretskiy miretskiy marked this pull request as ready for review March 29, 2022 20:22
@miretskiy miretskiy requested a review from a team as a code owner March 29, 2022 20:22
@miretskiy miretskiy requested review from a team, HonoreDB, stevendanna and nvanbenschoten and removed request for a team March 29, 2022 20:22
if n, ok := row[0].(*tree.DInt); ok {
return j, int64(*n), nil
if row == nil {
return nil, errScheduleNotRunnable
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add (wrap) a little information here, such as the scheduleID? Just in case we this happens more frequently than we expect.

@HonoreDB
Copy link
Contributor

HonoreDB commented Mar 29, 2022

Again, not sure why we get index creation suggestion

Probably on your table status has better specificity...lots of old job records not running any more with the same created_by. Might actually be realistic.

@miretskiy
Copy link
Contributor Author

Probably on your table status has better specificity...lots of old job records not running any more with the same created_by. Might actually be realistic.

But created_by_index also stores status....

@miretskiy miretskiy requested a review from dt March 30, 2022 12:22
@nvanbenschoten
Copy link
Member

This all looks much better. Executing each candidate schedule in its own txn is a major improvement.

SELECT schedule_id FROM system.scheduled_jobs WHERE next_run < now() ORDER BY random() LIMIT 10;

Why the ORDER BY random()? The effect of this is that we can't push the limit into the scan. But we do end up scanning scheduled_jobs@next_run_idx from (/NULL - /'2022-03-29 12:07:30.689322+00:00'], so this transaction will contend with any other that write into this range. However, you're now running this scan outside of the read-write scheduling transaction, so this is probably ok.

If we could, would we like this to be?

SELECT schedule_id FROM system.scheduled_jobs WHERE next_run < now() LIMIT 10 FOR UPDATE SKIP LOCKED;

@miretskiy
Copy link
Contributor Author

Going to add a TODO to use SKIP LOCKED when it's ready.

}

timeout := schedulerScheduleExecutionTimeout.Get(&s.Settings.SV)
if processErr := withSavePoint(ctx, txn, func() error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need this save point anymore ? can we just do this whole schedule over?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was a bit worried about dropping this. We need to update schedule when ExecuteJob returns. If it returns an error,
I don't know if it did any mutations. I think if we do something like #78466, then we would be able to remove withSavePoint.

@miretskiy
Copy link
Contributor Author

bors r+

@craig
Copy link
Contributor

craig bot commented Mar 31, 2022

Build succeeded:

@craig craig bot merged commit eec4ccc into cockroachdb:master Mar 31, 2022
craig bot pushed a commit that referenced this pull request Jun 30, 2022
79134: kv: support FOR {UPDATE,SHARE} SKIP LOCKED r=arulajmani a=nvanbenschoten

KV portion of #40476.
Assists #62734.
Assists #72407.
Assists #78564.

**NOTE: the SQL changes here were extracted from this PR and moved to #83627. This allows us to land the KV portion of this change without exposing it yet.**

```sql
CREATE TABLE kv (k INT PRIMARY KEY, v INT)
INSERT INTO kv VALUES (1, 1), (2, 2), (3, 3)


-- in session 1
BEGIN; UPDATE kv SET v = 0 WHERE k = 1 RETURNING *

  k | v
----+----
  1 | 0


-- in session 2
BEGIN; SELECT * FROM kv ORDER BY k LIMIT 1 FOR UPDATE SKIP LOCKED

  k | v
----+----
  2 | 2


-- in session 3
BEGIN; SELECT * FROM kv FOR UPDATE SKIP LOCKED

  k | v
----+----
  3 | 3
```

These semantics closely match those of FOR {UPDATE,SHARE} SKIP LOCKED in PostgreSQL. With SKIP LOCKED, any selected rows that cannot be immediately locked are skipped. Skipping locked rows provides an inconsistent view of the data, so this is not suitable for general purpose work, but can be used to avoid lock contention with multiple consumers accessing a queue-like table.

[Here](https://www.pgcasts.com/episodes/the-skip-locked-feature-in-postgres-9-5) is a short video that explains why users might want to use SKIP LOCKED in Postgres. The same motivation applies to CockroachDB. However, SKIP LOCKED is not a complete solution to queues, as MVCC garbage will still become a major problem with sufficiently high consumer throughput. Even with a very low gc.ttl, CockroachDB does not garbage collect MVCC garbage fast enough to avoid slowing down consumers that scan from the head of a queue over MVCC tombstones of previously consumed queue entries.

----

### Implementation

Skip locked has a number of touchpoints in Storage and KV. To understand these, we first need to understand the isolation model of skip-locked. When a request is using a SkipLocked wait policy, it behaves as if run at a weaker isolation level for any keys that it skips over. If the read request does not return a key, it does not make a claim about whether that key does or does not exist or what the key's value was at the read's MVCC timestamp. Instead, it only makes a claim about the set of keys that are returned. For those keys which were not skipped and were returned (and often locked, if combined with a locking strength, though this is not required), serializable isolation is enforced.

When the `pebbleMVCCScanner` is configured with the skipLocked option, it does not include locked keys in the result set. To support this, the MVCC layer needs to be provided access to the in-memory lock table, so that it can determine whether keys are locked with unreplicated lock. Replicated locks are represented as intents, which will be skipped over in getAndAdvance.

Requests using the SkipLocked wait policy acquire the same latches as before and wait on all latches ahead of them in line. However, if a request is using a SkipLocked wait policy, we always perform optimistic evaluation. In Replica.collectSpansRead, SkipLocked reads are able to constrain their read spans down to point reads on just those keys that were returned and were not already locked. This means that there is a good chance that some or all of the write latches that the SkipLocked read would have blocked on won't overlap with the keys that the request ends up returning, so they won't conflict when checking for optimistic conflicts.

Skip locked requests do not scan the lock table when initially sequencing. Instead, they capture a snapshot of the in-memory lock table while sequencing and scan the lock table as they perform their MVCC scan using the btree snapshot stored in the concurrency guard. MVCC was taught about skip locked in the previous commit.

Skip locked requests add point reads for each of the keys returned to the timestamp cache, instead of adding a single ranged read. This satisfies the weaker isolation level of skip locked. Because the issuing transaction is not intending to enforce serializable isolation across keys that were skipped by its request, it does not need to prevent writes below its read timestamp to keys that were skipped.

Similarly, Skip locked requests only records refresh spans for the individual keys returned, instead of recording a refresh span across the entire read span. Because the issuing transaction is not intending to enforce serializable isolation across keys that were skipped by its request, it does not need to validate that they have not changed if the transaction ever needs to refresh.

----

### Benchmarking

I haven't done any serious benchmarking with this SKIP LOCKED yet, though I'd like to. At some point, I would like to build a simple queue-like workload into the `workload` tool and experiment with various consumer access patterns (non-locking reads, locking reads, skip-locked reads), indexing schemes, concurrency levels (for producers and consumers), and batch sizes.

82915: sql: add locality to system.sql_instances table r=rharding6373 a=rharding6373

This PR adds the column `locality` to the `system.sql_instances` table
that contains the locality (e.g., region) of a SQL instance. The encoded
locality is a string representing the `roachpb.Locality` that may have
been provided when the instance was created.

This change also pipes the locality through `InstanceInfo`. This will
allow us to determine and use locality information of other SQL
instances, e.g. in DistSQL for multi-tenant locality-awareness
distribution planning.

Informs: #80678

Release note (sql change): Table `system.sql_instances` has a new
column, `locality`, that stores the locality of a SQL instance if it was
provided when the instance was started. This exposes a SQL instance's
locality to other instances in the cluster for query planning.


83418: loopvarcapture: do not flag `defer` within local closure r=srosenberg,dhartunian a=renatolabs

Previously, handling of `defer` statements in the `loopvarcapture`
linter was naive: whenever a `defer` statement in the body of a loop
referenced a loop variable, the linter would flag it as an invalid
reference. However, that can be overly restrictive, as a relatively
common idiom is to create literal functions and immediately call them
so as to take advantage of `defer` semantics, as in the example below:

```go
for _, n := range numbers {
    // ...
    func() {
           // ...
           defer func() { doSomewithing(n) }() // always safe
           // ...
    }()
}
```

The above reference is valid because it is guaranteed to be called
with the correct value for the loop variable.

A similar scenario occurs when a closure is assigned to a local
variable for use within the loop:

```go
for _, n := range numbers {
    // ...
    helper := func() {
           // ...
           defer func() { doSomething(n) }()
           // ...
    }
    // ...
    helper() // always safe
}
```

In the snippet above, calling the `helper` function is also always
safe because the `defer` statement is scoped to the closure containing
it. However, it is still *not* safe to call the helper function within
a Go routine.

This commit updates the `loopvarcapture` linter to recognize when a
`defer` statement is safe because it is contained in a local
closure. The two cases illustrated above will no longer be flagged,
allowing for that idiom to be used freely.

Release note: None.

83545: sql/schemachanger: move end to end testing to one test per-file r=fqazi a=fqazi

Previously, we allowed multiple tests per-file for end-to-end
testing inside the declarative schema changer. This was inadequate
because we plan on extending the end-to-end testing to start injecting
additional read/write operations at different stages, which would
make it difficult. To address this, this patch will split tests into
individual files, with one test per file. Additionally, it extends
support to allow multiple statements per-test statement, for transaction
support testing (this is currently unused).

Release note: None

Co-authored-by: Nathan VanBenschoten <[email protected]>
Co-authored-by: rharding6373 <[email protected]>
Co-authored-by: Renato Costa <[email protected]>
Co-authored-by: Faizan Qazi <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants