diff --git a/docs/tech-notes/jobs.md b/docs/tech-notes/jobs.md new file mode 100644 index 000000000000..0a85023f29b8 --- /dev/null +++ b/docs/tech-notes/jobs.md @@ -0,0 +1,599 @@ +# The Job System + +Original authors: Michael Butler & Shiranka Miskin (November 2021) + + + +## What is a Job? + +Jobs are CockroachDB's way of representing long running background tasks. They +are used internally as a core part of various features of CockroachDB such as +Changefeeds, Backups, and Schema Changes. Progress is regularly persisted such +that a node may fail / the job can be paused and a node will still be able to +resume the work later on. + + +## User Control + +Users can send `PAUSE`/`RESUME`/`CANCEL` commands to specific jobs via SQL +commands such as `PAUSE JOB {job_id}` for a single job, to an arbitrary set of +jobs with commands such as `CANCEL JOBS {select_clause}`, to specific job types +through commands such as `RESUME ALL CHANGEFEED JOBS`, or to jobs triggered by a +given [schedule](##scheduled-jobs) through commands such as `PAUSE JOBS FOR +SCHEDULE {schedule_id}`. + +Commands to specific job ids are handled through +[`controlJobsNode.startExec`](https://github.com/cockroachdb/cockroach/blob/ae17f3df3448dcf13d4b187f1c45256cfa17d2f7/pkg/sql/control_jobs.go#L113). +`PAUSE` and `CANCEL` commands move the job to a `pause-requested` and +`cancel-requested` state respectively, which allows the node currently running +the job to proceed with actually pausing or cancelling it (see `jobs/cancel` in +[Job Management](#job-management)). `RESUME` moves the job to a `running` state +to be picked up by any node (see `jobs/adopt` in [Job +Management](#job-management)). + +The batch commands based on a schedule / type are simply delegated to the +respective `{cmd} JOBS` command via +[`delegateJobControl`](https://github.com/cockroachdb/cockroach/blob/ae17f3df3448dcf13d4b187f1c45256cfa17d2f7/pkg/sql/delegate/job_control.go#L40). + + +## Internal Representation + +A Job is represented as a row in the +[`system.jobs`](https://github.com/cockroachdb/cockroach/blob/7097a9015f1a09c7dee4fbdbcc6bde82121f657b/pkg/sql/catalog/systemschema/system.go#L185) +table which is the single source of truth for individual job information. Nodes +read this table to determine the jobs they can execute, using the +[`payload`](https://github.com/cockroachdb/cockroach/blob/8a501a247f177bf287bcf34beb4f05155818998c/pkg/jobs/jobspb/jobs.proto#L755) +to determine how to execute them and the +[`progress`](https://github.com/cockroachdb/cockroach/blob/8a501a247f177bf287bcf34beb4f05155818998c/pkg/jobs/jobspb/jobs.proto#L815) +to pick up from where other nodes may have left it. Completed/cancelled jobs are +[eventually](https://github.com/cockroachdb/cockroach/blob/8a501a247f177bf287bcf34beb4f05155818998c/pkg/jobs/config.go#L107) +garbage collected from `system.jobs`. + +```sql +CREATE TABLE system.jobs ( + id INT8 DEFAULT unique_rowid(), + created TIMESTAMP NOT NULL DEFAULT now(), + + -- States such as "pending", "running", "paused", "pause-requested", etc. + status STRING NOT NULL, + + -- Information specific to each type of job (ex: Changefeed, Backup). + payload BYTES NOT NULL, -- inspectable via crdb_internal.pb_to_json('cockroach.sql.jobs.jobspb.Payload', payload) + progress BYTES, -- inspectable via crdb_internal.pb_to_json('cockroach.sql.jobs.jobspb.Progress', progress) + + -- Used to track which node currently owns execution of the job + claim_session_id BYTES, + claim_instance_id INT8, + + -- The system that created the job and the id relevant to that system (ex: crdb_schedule and the schedule_id) + created_by_type STRING, + created_by_id INT, + + -- Useful for observability + num_runs INT8, + last_run TIMESTAMP, + + ... -- constraint/index/family +) +``` + +Jobs are primarily specified and differentiated through their `payload` column +which are bytes of type +[jobspb.Payload](https://github.com/cockroachdb/cockroach/blob/8a501a247f177bf287bcf34beb4f05155818998c/pkg/jobs/jobspb/jobs.proto#L755), +storing information related to the type, parameters, and metadata of the job. + +```protobuf +message Payload { + // General information relevant to any job + string description = 1; + repeated string statement = 16; + int64 started_micros = 3; + int64 finished_micros = 4; + ... + repeated errorspb.EncodedError resume_errors = 17; + repeated errorspb.EncodedError cleanup_errors = 18; + errorspb.EncodedError final_resume_error = 19; + ... + // Type-specific payload details that mark the type of job and stores type-specific information + oneof details { + BackupDetails backup = 10; + RestoreDetails restore = 11; + ... // Details for the many other types of jobs + } + ... +} + +``` + + + +The progress of the specified job through its lifecycle is tracked in a separate +`progress` column of type +[`jobspb.Progress`](https://github.com/cockroachdb/cockroach/blob/8a501a247f177bf287bcf34beb4f05155818998c/pkg/jobs/jobspb/jobs.proto#L815). + +```protobuf +message Progress { + // Generic information for observability + oneof progress { + float fraction_completed = 1; + util.hlc.Timestamp high_water = 3; + } + string running_status = 4; + + // Type-specific progress information similar to Payload details + oneof details { // Note that while this is also called details, it does not take {JobType}Details structs like Payload + BackupProgress backup = 10; + RestoreProgress restore = 11; + ... // Progress for the many other types of jobs + } + ... +} +``` + +A more user-readable version of `system.jobs` with the parsed `payload` and +`progress` information can be monitored through the +[`crdb_internal.jobs`](https://github.com/cockroachdb/cockroach/blob/8a501a247f177bf287bcf34beb4f05155818998c/pkg/sql/crdb_internal.go#L660) +table (which simply reads from `system.jobs`). The [`SHOW +JOBS`](https://www.cockroachlabs.com/docs/stable/show-jobs.html) command +operates by [reading from +`crdb_internal.jobs`](https://github.com/cockroachdb/cockroach/blob/8a501a247f177bf287bcf34beb4f05155818998c/pkg/sql/delegate/show_jobs.go#L23). + +A Job is created when a [new row is +inserted](https://github.com/cockroachdb/cockroach/blob/7097a9015f1a09c7dee4fbdbcc6bde82121f657b/pkg/jobs/registry.go#L550) +to `system.jobs`. Once written, it is able to be "claimed" by any node in the +cluster (asserting that it is responsible for executing the job) through the +[setting of `claim_session_id` and +`claim_instance_id`](https://github.com/cockroachdb/cockroach/blob/8a501a247f177bf287bcf34beb4f05155818998c/pkg/jobs/adopt.go#L48) +in the job record. Job execution ends when the status changes from `running` to +`success`, `cancelled`, or `failure`. The job record eventually gets +[deleted](https://github.com/cockroachdb/cockroach/blob/7097a9015f1a09c7dee4fbdbcc6bde82121f657b/pkg/jobs/registry.go#L1000) +from the system table -- by default, 14 hours after execution ends. + +If that node fails to completely execute the job (either the node failing or the +job being paused), once the job record is in the table it is also able to be +resumed by any node in the cluster. The mechanism by which this is done is the +[`JobRegistry`](#Job-Management). + + +## The Job Registry +A node interacts with the jobs table through the [`JobRegistry`](https://github.com/cockroachdb/cockroach/blob/7097a9015f1a09c7dee4fbdbcc6bde82121f657b/pkg/jobs/registry.go#L92) +struct. + + +### Job Creation +A node creates a job by calling the Registry's +[`CreateJobWithTxn`](https://github.com/cockroachdb/cockroach/blob/8a501a247f177bf287bcf34beb4f05155818998c/pkg/jobs/registry.go#L523) +or +[`CreateAdoptableJobWithTxn`](https://github.com/cockroachdb/cockroach/blob/8a501a247f177bf287bcf34beb4f05155818998c/pkg/jobs/registry.go#L560) +which +[`INSERT`s](https://github.com/cockroachdb/cockroach/blob/8a501a247f177bf287bcf34beb4f05155818998c/pkg/jobs/registry.go#L550) +a new row into `system.jobs` . The node passes the job specific +information to these functions via the +[`Record`](https://github.com/cockroachdb/cockroach/blob/8a501a247f177bf287bcf34beb4f05155818998c/pkg/jobs/jobs.go#L91) +struct. + +If the node created the job with `CreateJobWithTxn`, it will also claim the job by setting +the claim IDs and +[start](https://github.com/cockroachdb/cockroach/blob/8a501a247f177bf287bcf34beb4f05155818998c/pkg/jobs/jobs.go#L890) +the job. By contrast, `CreateAdoptableJobWithTxn` allows another node to adopt and +[resume](https://github.com/cockroachdb/cockroach/blob/8a501a247f177bf287bcf34beb4f05155818998c/pkg/jobs/adopt.go#L246) +the job (`startableJob.Start()` is never called on these jobs). + + +### Job Management +While job creation is triggered in response to client queries such as +[`BACKUP`](###backup), job adoption, cancellation, and deletion is managed +through daemon goroutines. These daemon goroutines will continually run +on each node, allowing any node to participate in picking up work, unassigning +jobs that were on nodes that failed, and cleaning up old job records from the +table. + +These goroutines begin when each node's SQL Server +[starts](https://github.com/cockroachdb/cockroach/blob/8a501a247f177bf287bcf34beb4f05155818998c/pkg/server/server.go#L1876), +initializing the `JobRegistry` via +[`jobRegistry.Start(...)`](https://github.com/cockroachdb/cockroach/blob/8a501a247f177bf287bcf34beb4f05155818998c/pkg/server/server_sql.go#L1083). +Specifically, `jobRegistry.Start()` kicks off the following goroutines: + +- **[`jobs/adopt`](https://github.com/cockroachdb/cockroach/blob/8a501a247f177bf287bcf34beb4f05155818998c/pkg/jobs/registry.go#L896)** + : At the + [`jobs.registry.interval.adopt`](https://github.com/cockroachdb/cockroach/blob/8a501a247f177bf287bcf34beb4f05155818998c/pkg/jobs/config.go#L82) + interval (default 30s), call + [`claimJobs`](https://github.com/cockroachdb/cockroach/blob/8a501a247f177bf287bcf34beb4f05155818998c/pkg/jobs/adopt.go#L91) + to poll (query?) `system.jobs` and attempt to + [`UPDATE`](https://github.com/cockroachdb/cockroach/blob/8a501a247f177bf287bcf34beb4f05155818998c/pkg/jobs/adopt.go#L48) + up to `$COCKROACH_JOB_ADOPTIONS_PER_PERIOD` (default 10) rows without an + assigned `claim_session_id` with the current node's session and instance IDs. + After [claiming](https://github.com/cockroachdb/cockroach/blob/8a501a247f177bf287bcf34beb4f05155818998c/pkg/jobs/adopt.go#L48) + new jobs, the node [resumes them](https://github.com/cockroachdb/cockroach/blob/8a501a247f177bf287bcf34beb4f05155818998c/pkg/jobs/adopt.go#L194). + + +- **[`jobs/cancel`](https://github.com/cockroachdb/cockroach/blob/8a501a247f177bf287bcf34beb4f05155818998c/pkg/jobs/registry.go#L839)** + : At the + [`jobs.registry.interval.cancel`](https://github.com/cockroachdb/cockroach/blob/8a501a247f177bf287bcf34beb4f05155818998c/pkg/jobs/config.go#L91) + interval (default 10s), [set `claim_session_id = + NULL`](https://github.com/cockroachdb/cockroach/blob/8a501a247f177bf287bcf34beb4f05155818998c/pkg/jobs/registry.go#L749) + for up to + [`jobs.cancel_update_limit`](https://github.com/cockroachdb/cockroach/blob/8a501a247f177bf287bcf34beb4f05155818998c/pkg/jobs/config.go#L114) + jobs with session IDs that fail a [SQL Liveness + check](https://github.com/cockroachdb/cockroach/blob/8a501a247f177bf287bcf34beb4f05155818998c/pkg/sql/sqlliveness/slstorage/slstorage.go#L190). + See the [SQL Liveness RFC](https://github.com/cockroachdb/cockroach/blob/master/docs/RFCS/20200615_sql_liveness.md) + for more information on liveness and claim IDs. Jobs claimed by the current node in the `pause-requested` + and `cancel-requested` states are also + [transitioned](https://github.com/cockroachdb/cockroach/blob/ae17f3df3448dcf13d4b187f1c45256cfa17d2f7/pkg/jobs/adopt.go#L430) + to the `PAUSED` and `REVERTING` states by the [same daemon + goroutine](https://github.com/cockroachdb/cockroach/blob/ae17f3df3448dcf13d4b187f1c45256cfa17d2f7/pkg/jobs/registry.go#L815). + +- **[`jobs/gc`](https://github.com/cockroachdb/cockroach/blob/8a501a247f177bf287bcf34beb4f05155818998c/pkg/jobs/registry.go#L863)** + : At the + [`jobs.registry.interval.gc`](https://github.com/cockroachdb/cockroach/blob/8a501a247f177bf287bcf34beb4f05155818998c/pkg/jobs/config.go#L99) + interval (default 1h), query for jobs with a `status` of + [Succeded/Cancelled/Failed](https://github.com/cockroachdb/cockroach/blob/8a501a247f177bf287bcf34beb4f05155818998c/pkg/jobs/registry.go#L983) + that stopped running more than + [`jobs.retention_time`](https://github.com/cockroachdb/cockroach/blob/8a501a247f177bf287bcf34beb4f05155818998c/pkg/jobs/config.go#L107) + (default 2 weeks) ago and + [`DELETE`](https://github.com/cockroachdb/cockroach/blob/8a501a247f177bf287bcf34beb4f05155818998c/pkg/jobs/registry.go#L1000) + them. + - Since the finished time is stored in the `finished_micros` of the + `Payload` protobuf and cannot be read directly by SQL, up to + [100](https://github.com/cockroachdb/cockroach/blob/8a501a247f177bf287bcf34beb4f05155818998c/pkg/jobs/registry.go#L935) + job records are + [`SELECT`](https://github.com/cockroachdb/cockroach/blob/8a501a247f177bf287bcf34beb4f05155818998c/pkg/jobs/registry.go#L951) ed + and their `payload`s are + [unmarshalled](https://github.com/cockroachdb/cockroach/blob/8a501a247f177bf287bcf34beb4f05155818998c/pkg/jobs/registry.go#L977) + to filter on the finished time. + + +### Job Execution + +Each job type registers its respective execution logic through +[jobs.RegisterConstructor](https://github.com/cockroachdb/cockroach/blob/8a501a247f177bf287bcf34beb4f05155818998c/pkg/jobs/registry.go#L1153) +which globally registers a +[`Constructor`](https://github.com/cockroachdb/cockroach/blob/8a501a247f177bf287bcf34beb4f05155818998c/pkg/jobs/registry.go#L1148) +function for a given +[`jobspb.Type`](https://github.com/cockroachdb/cockroach/blob/8a501a247f177bf287bcf34beb4f05155818998c/pkg/jobs/jobspb/jobs.pb.go#L103). +This `Constructor` returns an implementation of +[`jobs.Resumer`](https://github.com/cockroachdb/cockroach/blob/8a501a247f177bf287bcf34beb4f05155818998c/pkg/jobs/registry.go#L1111) +with the `Resume` and `OnFailOrCancel` functions for the registry to execute. + +When a job is adopted and +[resumed](https://github.com/cockroachdb/cockroach/blob/8a501a247f177bf287bcf34beb4f05155818998c/pkg/jobs/adopt.go#L246) +by a node's registry, unless the job is not already running or completed, +[runJob](https://github.com/cockroachdb/cockroach/blob/8a501a247f177bf287bcf34beb4f05155818998c/pkg/jobs/adopt.go#L367) +is ran in its own goroutine as an [async +task](https://github.com/cockroachdb/cockroach/blob/8a501a247f177bf287bcf34beb4f05155818998c/pkg/jobs/adopt.go#L333). +During adoption a [new context is +created](https://github.com/cockroachdb/cockroach/blob/8a501a247f177bf287bcf34beb4f05155818998c/pkg/jobs/registry.go#L662) +for the `runJob` execution with its own +[`cancel`](https://github.com/cockroachdb/cockroach/blob/8a501a247f177bf287bcf34beb4f05155818998c/pkg/jobs/registry.go#L259) +function that is stored in the +registry's internal +[map](https://github.com/cockroachdb/cockroach/blob/8a501a247f177bf287bcf34beb4f05155818998c/pkg/jobs/registry.go#L143) +of [adopted +jobs](https://github.com/cockroachdb/cockroach/blob/8a501a247f177bf287bcf34beb4f05155818998c/pkg/jobs/registry.go#L52). +This cancel callback stored in the registry allows it to remotely terminate the +job when it must +[`servePauseAndCancelRequests`](https://github.com/cockroachdb/cockroach/blob/8a501a247f177bf287bcf34beb4f05155818998c/pkg/jobs/adopt.go#L443). + +This thread handles executing the job by modeling it as a state machine through +the registry's +[`stepThroughStateMachine`](https://github.com/cockroachdb/cockroach/blob/8a501a247f177bf287bcf34beb4f05155818998c/pkg/jobs/registry.go#L1174), +starting from the `StatusRunning` state. It [calls +resumer.Resume](https://github.com/cockroachdb/cockroach/blob/8a501a247f177bf287bcf34beb4f05155818998c/pkg/jobs/registry.go#L1209) +to execute the job-specific logic, and once the function completes it +recursively calls `stepThroughStateMachine` to transition to the appropriate +next state depending on if the resumption completed successfully or due to some +form of error (either through the job's own execution or through a context error +triggered by `servePauseAndCancelRequests`). As it transitions through the +state machine it updates job state accordingly and potentially calls +`resumer.OnFailOrCancel` for job-specific handling. Eventually the original +`stepThroughStateMachine` exits and the `runJob` thread can complete. + + + + +## Scheduled Jobs + +Jobs on their own are triggered by specific user commands such as `BACKUP` or +`CREATE CHANGEFEED`, however cockroachdb also supports scheduling jobs to run in +the future and be able to recur based on a crontab schedule. As of writing this +is only used for backups with `CREATE SCHEDULE FOR BACKUP`. + +Similar to Jobs, Schedules can be `PAUSE`d, `RESUME`d, or `DROP`ped via `{cmd} +SCHEDULES {select_clause}` and are handled by +[`controlSchedulesNode.startExec`](https://github.com/cockroachdb/cockroach/blob/ae17f3df3448dcf13d4b187f1c45256cfa17d2f7/pkg/sql/control_schedules.go#L136). +`PAUSE` and `RESUME` simply set the `next_run` of the schedule to either +an empty value or the next iteration according to the `schedule_expr`, while +`CANCEL` deletes the record from the table. + +Job Schedules are stored in the +[`system.scheduled_jobs`](https://github.com/cockroachdb/cockroach/blob/ae17f3df3448dcf13d4b187f1c45256cfa17d2f7/pkg/sql/catalog/systemschema/system.go#L432) +table. A more user-readable version can be viewed using the [`SHOW +SCHEDULES`](https://www.cockroachlabs.com/docs/stable/show-schedules.html) SQL +statement. + +```sql +CREATE TABLE system.scheduled_jobs ( + schedule_id INT DEFAULT unique_rowid() NOT NULL, + schedule_name STRING NOT NULL, + created TIMESTAMPTZ NOT NULL DEFAULT now(), + owner STRING NOT NULL, + next_run TIMESTAMPTZ, -- the next scheduled run of the job in UTC, NULL if paused + schedule_state BYTES, -- inspectable via crdb_internal.pb_to_json('cockroach.jobs.jobspb.ScheduleState', schedule_state) + schedule_expr STRING, -- job schedule in crontab format, if empty the schedule will not recur + schedule_details BYTES, -- inspectable via crdb_internal.pb_to_json('cockroach.jobs.jobspb.ScheduleDetails', schedule_details) + executor_type STRING NOT NULL, + execution_args BYTES NOT NULL, + ... +) +``` + +State on the schedule's behavior and current status are stored in the +`schedule_details` and `schedule_state` columns respectively which follow the +following protobuf formats: + +```protobuf +// How to schedule and execute the job. +message ScheduleDetails { + // How to handle encountering a previously started job that hasn't completed yet + enum WaitBehavior { + WAIT = 0; // Wait for previous run to complete then run + NO_WAIT = 1; // Do not wait and just run a potentially overlapping execution + SKIP = 2; // If the previous run is ongoing, just advance the schedule without running + } + + // How to proceed if the scheduled job fails + enum ErrorHandlingBehavior { + RETRY_SCHED = 0; // Allow the job to execute again next time its scheduled to do so + RETRY_SOON = 1; // Retry the job almost immediately after failure + PAUSE_SCHED = 2; // Pause the schedule entirely + } + + WaitBehavior wait = 1; + ErrorHandlingBehavior on_error = 2; +} + +// Mutable state for the schedule such as error strings +message ScheduleState { + string status = 1; +} +``` + +A CRDB node writes to a row in the scheduled jobs table through the [`ScheduledJob`](https://github.com/cockroachdb/cockroach/blob/ae17f3df3448dcf13d4b187f1c45256cfa17d2f7/pkg/jobs/scheduled_job.go#L55) +struct by queuing up individual changes in its `dirty` property and then either [creating](https://github.com/cockroachdb/cockroach/blob/ae17f3df3448dcf13d4b187f1c45256cfa17d2f7/pkg/jobs/scheduled_job.go#L365) +or [updating](https://github.com/cockroachdb/cockroach/blob/ae17f3df3448dcf13d4b187f1c45256cfa17d2f7/pkg/jobs/scheduled_job.go#L398) +a row in the table to commit them. Next, we'll discuss when a CRDB node would write to this table. + + +### Schedule Creation + +A [`ScheduledJob`](https://github.com/cockroachdb/cockroach/blob/ae17f3df3448dcf13d4b187f1c45256cfa17d2f7/pkg/jobs/scheduled_job.go#L55) struct is first created via +[`NewScheduledJob`](https://github.com/cockroachdb/cockroach/blob/ae17f3df3448dcf13d4b187f1c45256cfa17d2f7/pkg/jobs/scheduled_job.go#L72) +(ex: in +[`makeBackupSchedule`](https://github.com/cockroachdb/cockroach/blob/ae17f3df3448dcf13d4b187f1c45256cfa17d2f7/pkg/ccl/backupccl/create_scheduled_backup.go#L595)), +initializing the struct which can then have its properties be set using the +setter functions. Finally, the data in the struct is persisted into the +`scheduled_jobs` table via +[`ScheduledJob.Create`](https://github.com/cockroachdb/cockroach/blob/ae17f3df3448dcf13d4b187f1c45256cfa17d2f7/pkg/jobs/scheduled_job.go#L365). + + +### Schedule Management + + +When a SQLServer initializes, it calls +[`StartJobSchedulerDaemon`](https://github.com/cockroachdb/cockroach/blob/ae17f3df3448dcf13d4b187f1c45256cfa17d2f7/pkg/server/server_sql.go#L1163) +to start the +[`job-scheduler`](https://github.com/cockroachdb/cockroach/blob/ae17f3df3448dcf13d4b187f1c45256cfa17d2f7/pkg/jobs/job_scheduler.go#L355) +async task, similar to other job background tasks. Unless +[`jobs.scheduler.enabled`](https://github.com/cockroachdb/cockroach/blob/ae17f3df3448dcf13d4b187f1c45256cfa17d2f7/pkg/jobs/job_scheduler.go#L387) +is false, on an interval of +[`jobs.scheduler.pace`](https://github.com/cockroachdb/cockroach/blob/ae17f3df3448dcf13d4b187f1c45256cfa17d2f7/pkg/jobs/job_scheduler.go#L393) +(default 1 minute), the node will attempt to +[process](https://github.com/cockroachdb/cockroach/blob/ae17f3df3448dcf13d4b187f1c45256cfa17d2f7/pkg/jobs/job_scheduler.go#L132) +up to +[`jobs.scheduler.max_jobs_per_iteration`](https://github.com/cockroachdb/cockroach/blob/ae17f3df3448dcf13d4b187f1c45256cfa17d2f7/pkg/jobs/job_scheduler.go#L399) +schedules (default 10) with a `next_run` timestamp earlier than the current time. + + +If no jobs have successfully started for this current execution of the schedule, +a type-specific executor will be called to queue the appropriate jobs to be +executed (ex: +[`executeBackup`](https://github.com/cockroachdb/cockroach/blob/ae17f3df3448dcf13d4b187f1c45256cfa17d2f7/pkg/ccl/backupccl/schedule_exec.go#L63)). + +Once jobs created by this schedule are observed, the `next_run` of the schedule +will be advanced by +[`processSchedule`](https://github.com/cockroachdb/cockroach/blob/ae17f3df3448dcf13d4b187f1c45256cfa17d2f7/pkg/jobs/job_scheduler.go#L132) +according to the `schedule_expr` with some added jitter to avoid conflicting +transactions. + +The executor for a given type of Job Schedule is registered via the +[`RegisterScheduledJobExecutorFactory`](https://github.com/cockroachdb/cockroach/blob/ae17f3df3448dcf13d4b187f1c45256cfa17d2f7/pkg/jobs/scheduled_job_executor.go#L81) +function (ex: [registering +ScheduledBackupExecutor](https://github.com/cockroachdb/cockroach/blob/ae17f3df3448dcf13d4b187f1c45256cfa17d2f7/pkg/ccl/backupccl/schedule_exec.go#L470)). +The factory must return a struct that implements the +[ScheduledJobsExecutor](https://github.com/cockroachdb/cockroach/blob/ae17f3df3448dcf13d4b187f1c45256cfa17d2f7/pkg/jobs/scheduled_job_executor.go#L26) +interface. + +```go +type ScheduledJobExecutor interface { + ExecuteJob(...) error + NotifyJobTermination(...) error + Metrics() metric.Struct + GetCreateScheduleStatement(...) (string, error) +} +``` + +NB: Jobs created by a schedule will have their `created_by_type` and `created_by_id` +columns set to that of the schedule that created them. + + +# Applications of the Job System +In this section, we detail the code paths taken by CRDB SQL statements that spawn jobs, such as +BACKUP, CHANGEFEED and Schema Changes (e.g. ALTER). + +## Job Planning +SQL Statments that rely on the job system begin to branch from the conventional +life of a query during [Logical Planning and Optimization](https://github.com/cockroachdb/cockroach/blob/master/docs/tech-notes/life_of_a_query.md#logical-planning-and-optimization) +(read through this section first!). Specifically, after following a few internal calls from +`makeExecPlan`, we call [`trybuildOpaque`](https://github.com/cockroachdb/cockroach/blob/df67aa9707fbf0193ec8b3ca4062240c360fc808/pkg/sql/opt/optbuilder/opaque.go#L64) +to convert an AST into a memo. + - Detailed Note: `tryBuildOpaque` will match the statement to one +of the opaqueStatements, a map populated by [Opaque.go’s `init()`](https://github.com/cockroachdb/cockroach/blob/31db44da69bf21e67ebbef6fbb8c8bfb2e498efe/pkg/sql/opaque.go#L296) +, and then, call +[`buildOpaque`](https://github.com/cockroachdb/cockroach/blob/31db44da69bf21e67ebbef6fbb8c8bfb2e498efe/pkg/sql/opaque.go#L38) +mysteriously [here](https://github.com/cockroachdb/cockroach/blob/df67aa9707fbf0193ec8b3ca4062240c360fc808/pkg/sql/opt/optbuilder/opaque.go#L69). +Each kind of job will take a different path through `buildOpaque`, which we’ll discuss in more +detail later. + +Further, unlike the normal logical planning path for SQL queries, +`tryBuildOpaque` [skips](https://github.com/cockroachdb/cockroach/blob/df67aa9707fbf0193ec8b3ca4062240c360fc808/pkg/sql/opt/optbuilder/opaque.go#L75) +Cockroach’s optimization engine, and returns a populated `memo` object. Why skip query +optimization?? Cockroach SQL statements that spawn jobs don’t contain any +subqueries or additional operators that would benefit from query optimization +(e.g. JOIN, SELECT, WHERE). + +Finally, just like any regular query, the job’s memo object is then +[converted](https://github.com/cockroachdb/cockroach/blob/df67aa9707fbf0193ec8b3ca4062240c360fc808/pkg/sql/plan_opt.go#L262) +(specifically +[here](https://github.com/cockroachdb/cockroach/blob/fc67a0c9202af348e919afc1e1f70acc9a83b300/pkg/sql/opt/exec/execbuilder/relational.go#L2355)) +into a query plan, and +[`execWithDistSQLEngine`](https://github.com/cockroachdb/cockroach/blob/fc67a0c9202af348e919afc1e1f70acc9a83b300/pkg/sql/conn_executor_exec.go#L925-L927) +starts the job, specifically via spawning a new goroutine +[here](https://github.com/cockroachdb/cockroach/blob/df67aa9707fbf0193ec8b3ca4062240c360fc808/pkg/sql/planhook.go#L139) +(**not sure if all jobs go through hookfn node**). The exact function run by the +go routine varies by job. Note, unlike many other [query +plans](https://github.com/cockroachdb/cockroach/blob/master/docs/tech-notes/life_of_a_query.md#physical-planning-and-execution) +, DistSQLEngine will not create a distributed physical plan for the job--which +you can verify by checking the +[`distributePlan`](https://github.com/cockroachdb/cockroach/blob/fc67a0c9202af348e919afc1e1f70acc9a83b300/pkg/sql/conn_executor_exec.go#L916) +variable-- rather, each specific job will manually distribute work across nodes +during execution. Next we'll discuss how certain CRDB features spawn jobs in the DistSQL Engine. + +### CCL Job Planning +A subset of CRDB features -- BACKUP, RESTORE, IMPORT, +CHANGEFEED -- are licensed under the [Cockroach Community License (CCL)](https://www.cockroachlabs.com/docs/stable/licensing-faqs.html) +and plan jobs in a similar fashion. + +During logical planning, each CCL job [executes](https://github.com/cockroachdb/cockroach/blob/31db44da69bf21e67ebbef6fbb8c8bfb2e498efe/pkg/sql/opaque.go#L204) +their own specific [planhookFn](https://github.com/cockroachdb/cockroach/blob/31db44da69bf21e67ebbef6fbb8c8bfb2e498efe/pkg/sql/planhook.go#L40) +that most importantly returns a `PlanHookRowFn`, a function which in turn gets +called through the [`execWithDistSQLEngine`](https://github.com/cockroachdb/cockroach/blob/fc67a0c9202af348e919afc1e1f70acc9a83b300/pkg/sql/conn_executor_exec.go#L925-L927) +stack [here](https://github.com/cockroachdb/cockroach/blob/df67aa9707fbf0193ec8b3ca4062240c360fc808/pkg/sql/planhook.go#L139). +The `PlanHookRowFn` is responsible for planning the job and writing a job record to +the jobs table (i.e. creating the job). + + +There are some common themes to note in a `planHookRowFn`. +We’ll hyperlink to `backup_planning.go` to illustrate. +- Throughout job planning, several (but not all) read and write requests get sent to the kv layer +using [`p.ExtendedEvalContext().Txn`](https://github.com/cockroachdb/cockroach/blob/28bb1ea049da5bfb6e15a7003cd7b678cbc4b67f/pkg/ccl/backupccl/backup_planning.go#L1134) +a transaction handler scoped for the entire execution of the SQL statement, accessed via the +[planHookState](https://github.com/cockroachdb/cockroach/blob/master/pkg/sql/planhook.go#L70) +interface. **Most notably, we [write](https://github.com/cockroachdb/cockroach/blob/28bb1ea049da5bfb6e15a7003cd7b678cbc4b67f/pkg/ccl/backupccl/backup_planning.go#L1108) +the [job record](https://github.com/cockroachdb/cockroach/blob/28bb1ea049da5bfb6e15a7003cd7b678cbc4b67f/pkg/ccl/backupccl/backup_planning.go#L1088) +to the jobs table using this txn**. When we commit this transaction, all operations will commit; +else, all operations rollback. To read something quickly and transactionally during planning, +`planHookRowFn` often invokes a new transaction (eg [1](https://github.com/cockroachdb/cockroach/blob/master/pkg/ccl/backupccl/backup_planning.go#L880) +, [2](https://github.com/cockroachdb/cockroach/blob/master/pkg/ccl/backupccl/backup_planning.go#L815)) +using `p.ExecutorConfig.DB`. If you’re unfamiliar with transactions in CRDB, read the +[KV Client Interface](https://github.com/cockroachdb/cockroach/blob/master/docs/tech-notes/life_of_a_query.md#the-kv-client-interface) +section of Life of a Query. +- By default, bulk jobs (IMPORT, BACKUP, and RESTORE) [commit](https://github.com/cockroachdb/cockroach/blob/28bb1ea049da5bfb6e15a7003cd7b678cbc4b67f/pkg/ccl/backupccl/backup_planning.go#L1163) +the transaction, [start](https://github.com/cockroachdb/cockroach/blob/28bb1ea049da5bfb6e15a7003cd7b678cbc4b67f/pkg/ccl/backupccl/backup_planning.go#L1169) +and wait for the job to complete, all within the `planHookRowFn`! +Indeed, the SQL shell will not return until after the job completes! + - Advanced Note: These jobs use an implicit transaction to [control](https://github.com/cockroachdb/cockroach/blob/28bb1ea049da5bfb6e15a7003cd7b678cbc4b67f/pkg/ccl/backupccl/backup_planning.go#L1139) +when the transaction commits / rolls back outside the Txn API. When bulk jobs run with the +[`detached` parameter](https://github.com/cockroachdb/cockroach/blob/28bb1ea049da5bfb6e15a7003cd7b678cbc4b67f/pkg/ccl/backupccl/backup_planning.go#L1104) +instead, the `planHookRowFn` returns right after writing the job record to the jobs table. +By contrast, `p.ExtendedEvalContext().Txn` uses an _explicit_ transaction and +[commits](https://github.com/cockroachdb/cockroach/blob/fc67a0c9202af348e919afc1e1f70acc9a83b300/pkg/sql/conn_executor_exec.go#L776) +in connExecutor land. + +### Schema Changes Job Planning +TODO (hopefully by someone on SQL Schema :)))) + +## Job Execution +Next, we describe how various kinds of jobs work, i.e. when a node picks up a Backup or +Changefeed job, what does it actually do? This will be more high level, and will point towards +RFCs/Docs/Blogposts for further details. +TODO(Shiranka): CDC +TODO(???): Schema Changes +### Backup +TODO(MB) + +### Restore +Before reading through this code walk through of restore, watch the [MOLTing tutorial](https://cockroachlabs.udemy.com/course/general-onboarding/learn/lecture/22164146#overview) +on restore. This walkthrough doesn't consider the distinct code paths each type +of RESTORE may take. + +#### Planning +In addition to general CCL job planning, [planning a restore](https://github.com/cockroachdb/cockroach/blob/4149ca74099cee7a698fcade6d8ba6891f47dfed/pkg/ccl/backupccl/restore_planning.go#L1771) +has the following main components. +- [Resolve](https://github.com/cockroachdb/cockroach/blob/4149ca74099cee7a698fcade6d8ba6891f47dfed/pkg/ccl/backupccl/restore_planning.go#L1826) +the location of the backup files we seek to restore. +- [Figure out](https://github.com/cockroachdb/cockroach/blob/4149ca74099cee7a698fcade6d8ba6891f47dfed/pkg/ccl/backupccl/restore_planning.go#L1910) +which descriptors the user wants to restore. Descriptors are objects that hold metadata about + various + SQL objects, like [columns](https://github.com/cockroachdb/cockroach/blob/4149ca74099cee7a698fcade6d8ba6891f47dfed/pkg/sql/catalog/descpb/structured.proto#L123) +or [databases](https://github.com/cockroachdb/cockroach/blob/4149ca74099cee7a698fcade6d8ba6891f47dfed/pkg/sql/catalog/descpb/structured.proto#L1275). +- [Allocate](https://github.com/cockroachdb/cockroach/blob/4149ca74099cee7a698fcade6d8ba6891f47dfed/pkg/ccl/backupccl/restore_planning.go#L2028) new descriptor IDs for the descriptors we're restoring from the backup files. Why do + this? Every descriptor on disk has a unique id, so RESTORE must resolve ID collisions between the +stale ID's in the back up and any IDs in the target cluster. + +#### Execution +When a gateway node [resumes](https://github.com/cockroachdb/cockroach/blob/4149ca74099cee7a698fcade6d8ba6891f47dfed/pkg/ccl/backupccl/restore_job.go#L1371) +a restore job, the following occurs before any processors spin up. For more on processors, check +out the related section in [Life of a Query](https://github.com/cockroachdb/cockroach/blob/master/docs/tech-notes/life_of_a_query.md#physical-planning-and-execution). +- [Write](https://github.com/cockroachdb/cockroach/blob/4149ca74099cee7a698fcade6d8ba6891f47dfed/pkg/ccl/backupccl/restore_job.go#L1441) +the new descriptors to disk in an offline state so no users can interact with the + descriptors during the restore. +- [Derive](https://github.com/cockroachdb/cockroach/blob/4149ca74099cee7a698fcade6d8ba6891f47dfed/pkg/ccl/backupccl/restore_job.go#L426) +a list of key spans we need to restore from the restoring descriptors. A key span is just an + interval in the *backup's* key space. + +We're now ready to begin loading the backup data into our cockroach cluster. + +**Important note**: the last range in the restoring cluster's key space is one +big empty range, with a key span starting above the highest key with data in the +cluster up to the max key allowed in the cluster. We want to restore to that +empty range. Recall that a key span is merely an interval of keys while a range +has deeply physical representation: it represents stored data, in a given +key span, that is replicated across nodes. + +Our first task is to split this massive empty range up into smaller ranges that we will restore +data into, and randomly assign nodes to be leaseholders for these new ranges. More concretely, +the restore job's gateway node will [iterate through](https://github.com/cockroachdb/cockroach/blob/4149ca74099cee7a698fcade6d8ba6891f47dfed/pkg/ccl/backupccl/restore_processor_planning.go#L84) +the list of key spans we seek to restore, and round robin assign them to nodes in the cluster which +will then each start up a split and scatter processor. + +Each split and scatter processor will then do the following, for each key span it processes: +- Issue a [split key request](https://github.com/cockroachdb/cockroach/blob/4149ca74099cee7a698fcade6d8ba6891f47dfed/pkg/ccl/backupccl/split_and_scatter_processor.go#L94) + to the kv layer at the beginning key of the next span it will + process, which splits that big empty range at that given key, creating a new range to import data + into. I recommend reading the [code and comments here](https://github.com/cockroachdb/cockroach/blob/4149ca74099cee7a698fcade6d8ba6891f47dfed/pkg/ccl/backupccl/split_and_scatter_processor.go#L352) +because the indexing is a little confusing. + - Note: before the split request, we [remap](https://github.com/cockroachdb/cockroach/blob/4149ca74099cee7a698fcade6d8ba6891f47dfed/pkg/ccl/backupccl/split_and_scatter_processor.go#L84) + this key (currently in the backup's key space) so it maps nicely to the restore cluster's key + space. + E.g. suppose we want to restore a table with a key span in the backup from 57/1 to 57/2; but the + restore cluster already has data in that span. To avoid collisions, we have to remap this + key span into the key span of that empty range. +- Issue a [scatter request](https://github.com/cockroachdb/cockroach/blob/4149ca74099cee7a698fcade6d8ba6891f47dfed/pkg/ccl/backupccl/split_and_scatter_processor.go#L123) + to the kv layer on the span's first key. This asks kv to randomly reassign + the lease of this key's range. KV may not obey the request. +- Route info to this new range's new leaseholder, so the leaseholder can restore data into that + range. + +In addition to running a split and scatter processor, each node will run a restore data processor. +For each empty range the restore data processor receives, it will [read](https://github.com/cockroachdb/cockroach/blob/4149ca74099cee7a698fcade6d8ba6891f47dfed/pkg/ccl/backupccl/restore_data_processor.go#L167) +the relevant Backup SSTables in external storage, [remap](https://github.com/cockroachdb/cockroach/blob/4149ca74099cee7a698fcade6d8ba6891f47dfed/pkg/ccl/backupccl/restore_data_processor.go#L433) +each key to the restore's key space, and [flush](https://github.com/cockroachdb/cockroach/blob/4149ca74099cee7a698fcade6d8ba6891f47dfed/pkg/ccl/backupccl/restore_data_processor.go#L458) +SSTable(s) to disk, using the [kv client](https://github.com/cockroachdb/cockroach/blob/master/docs/tech-notes/life_of_a_query.md#the-kv-client-interface) +interface's [AddSStable](https://github.com/cockroachdb/cockroach/blob/4149ca74099cee7a698fcade6d8ba6891f47dfed/pkg/kv/bulk/sst_batcher.go#L490) +method, which bypasses much of the infrastructure related to writing data to disk from conventional queries. Note: all the kv shenanigans (e.g. range data replication, range splitting/merging, +leaseholder reassignment) is abstracted away from the bulk codebase, though these things can happen +while the restore is in process! + + TODO: Talk about how Backup uses two schedules that depend on each other and +must be cleaned up together + diff --git a/pkg/ccl/changefeedccl/alter_changefeed_stmt.go b/pkg/ccl/changefeedccl/alter_changefeed_stmt.go index be0f447f039d..7baba9cd3605 100644 --- a/pkg/ccl/changefeedccl/alter_changefeed_stmt.go +++ b/pkg/ccl/changefeedccl/alter_changefeed_stmt.go @@ -102,15 +102,17 @@ func alterChangefeedPlanHook( targetDescs = append(targetDescs, descs...) } - newTargets, err := getTargets(ctx, p, targetDescs, details.Opts) + newTargets, newTables, err := getTargetsAndTables(ctx, p, targetDescs, details.Opts) if err != nil { return err } // add old targets - for id, target := range details.Targets { - newTargets[id] = target + for id, table := range details.Tables { + newTables[id] = table } - details.Targets = newTargets + details.Tables = newTables + details.TargetSpecifications = append(details.TargetSpecifications, newTargets...) + } if opts.DropTargets != nil { @@ -129,12 +131,21 @@ func alterChangefeedPlanHook( if err := p.CheckPrivilege(ctx, desc, privilege.SELECT); err != nil { return err } - delete(details.Targets, table.GetID()) + delete(details.Tables, table.GetID()) } } + + newTargetSpecifications := make([]jobspb.ChangefeedTargetSpecification, len(details.TargetSpecifications)-len(opts.DropTargets)) + for _, ts := range details.TargetSpecifications { + if _, stillThere := details.Tables[ts.TableID]; stillThere { + newTargetSpecifications = append(newTargetSpecifications, ts) + } + } + details.TargetSpecifications = newTargetSpecifications + } - if len(details.Targets) == 0 { + if len(details.Tables) == 0 { return errors.Errorf("cannot drop all targets for changefeed job %d", jobID) } @@ -152,7 +163,7 @@ func alterChangefeedPlanHook( } var targets tree.TargetList - for _, target := range details.Targets { + for _, target := range details.Tables { targetName := tree.MakeTableNameFromPrefix(tree.ObjectNamePrefix{}, tree.Name(target.StatementTimeName)) targets.Tables = append(targets.Tables, &targetName) } diff --git a/pkg/ccl/changefeedccl/bench_test.go b/pkg/ccl/changefeedccl/bench_test.go index 910853144374..90af0d8cae48 100644 --- a/pkg/ccl/changefeedccl/bench_test.go +++ b/pkg/ccl/changefeedccl/bench_test.go @@ -196,7 +196,7 @@ func createBenchmarkChangefeed( tableDesc := desctestutils.TestingGetPublicTableDescriptor(s.DB(), keys.SystemSQLCodec, database, table) spans := []roachpb.Span{tableDesc.PrimaryIndexSpan(keys.SystemSQLCodec)} details := jobspb.ChangefeedDetails{ - Targets: jobspb.ChangefeedTargets{tableDesc.GetID(): jobspb.ChangefeedTarget{ + Tables: jobspb.ChangefeedTargets{tableDesc.GetID(): jobspb.ChangefeedTargetTable{ StatementTimeName: tableDesc.GetName(), }}, Opts: map[string]string{ @@ -204,7 +204,7 @@ func createBenchmarkChangefeed( }, } initialHighWater := hlc.Timestamp{} - encoder, err := makeJSONEncoder(details.Opts, details.Targets) + encoder, err := makeJSONEncoder(details.Opts, AllTargets(details)) if err != nil { return nil, nil, err } @@ -228,7 +228,7 @@ func createBenchmarkChangefeed( Clock: feedClock, Gossip: gossip.MakeOptionalGossip(s.GossipI().(*gossip.Gossip)), Spans: spans, - Targets: details.Targets, + Targets: AllTargets(details), Writer: buf, Metrics: &metrics.KVFeedMetrics, MM: mm, diff --git a/pkg/ccl/changefeedccl/changefeed.go b/pkg/ccl/changefeedccl/changefeed.go index a4acc8557a39..a9a4bb4aebc2 100644 --- a/pkg/ccl/changefeedccl/changefeed.go +++ b/pkg/ccl/changefeedccl/changefeed.go @@ -55,7 +55,7 @@ func createProtectedTimestampRecord( ctx context.Context, codec keys.SQLCodec, jobID jobspb.JobID, - targets jobspb.ChangefeedTargets, + targets []jobspb.ChangefeedTargetSpecification, resolved hlc.Timestamp, progress *jobspb.ChangefeedProgress, ) *ptpb.Record { @@ -69,19 +69,21 @@ func createProtectedTimestampRecord( jobsprotectedts.Jobs, targetToProtect) } -func makeTargetToProtect(targets jobspb.ChangefeedTargets) *ptpb.Target { +func makeTargetToProtect(targets []jobspb.ChangefeedTargetSpecification) *ptpb.Target { // NB: We add 1 because we're also going to protect system.descriptors. // We protect system.descriptors because a changefeed needs all of the history // of table descriptors to version data. tablesToProtect := make(descpb.IDs, 0, len(targets)+1) - for t := range targets { - tablesToProtect = append(tablesToProtect, t) + for _, t := range targets { + tablesToProtect = append(tablesToProtect, t.TableID) } tablesToProtect = append(tablesToProtect, keys.DescriptorTableID) return ptpb.MakeSchemaObjectsTarget(tablesToProtect) } -func makeSpansToProtect(codec keys.SQLCodec, targets jobspb.ChangefeedTargets) []roachpb.Span { +func makeSpansToProtect( + codec keys.SQLCodec, targets []jobspb.ChangefeedTargetSpecification, +) []roachpb.Span { // NB: We add 1 because we're also going to protect system.descriptors. // We protect system.descriptors because a changefeed needs all of the history // of table descriptors to version data. @@ -93,8 +95,8 @@ func makeSpansToProtect(codec keys.SQLCodec, targets jobspb.ChangefeedTargets) [ EndKey: tablePrefix.PrefixEnd(), }) } - for t := range targets { - addTablePrefix(uint32(t)) + for _, t := range targets { + addTablePrefix(uint32(t.TableID)) } addTablePrefix(keys.DescriptorTableID) return spansToProtect diff --git a/pkg/ccl/changefeedccl/changefeed_dist.go b/pkg/ccl/changefeedccl/changefeed_dist.go index 2a0142dac991..abec5d01fc2d 100644 --- a/pkg/ccl/changefeedccl/changefeed_dist.go +++ b/pkg/ccl/changefeedccl/changefeed_dist.go @@ -104,7 +104,7 @@ func distChangefeedFlow( spansTS = spansTS.Next() } var err error - trackedSpans, err = fetchSpansForTargets(ctx, execCfg, details.Targets, spansTS) + trackedSpans, err = fetchSpansForTargets(ctx, execCfg, AllTargets(details), spansTS) if err != nil { return err } @@ -121,7 +121,7 @@ func distChangefeedFlow( func fetchSpansForTargets( ctx context.Context, execCfg *sql.ExecutorConfig, - targets jobspb.ChangefeedTargets, + targets []jobspb.ChangefeedTargetSpecification, ts hlc.Timestamp, ) ([]roachpb.Span, error) { var spans []roachpb.Span @@ -133,10 +133,10 @@ func fetchSpansForTargets( return err } // Note that all targets are currently guaranteed to be tables. - for tableID := range targets { + for _, table := range targets { flags := tree.ObjectLookupFlagsWithRequired() flags.AvoidLeased = true - tableDesc, err := descriptors.GetImmutableTableByID(ctx, txn, tableID, flags) + tableDesc, err := descriptors.GetImmutableTableByID(ctx, txn, table.TableID, flags) if err != nil { return err } diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index 539d9131a32c..e78fcaa85df1 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -159,7 +159,7 @@ func newChangeAggregatorProcessor( } var err error - if ca.encoder, err = getEncoder(ca.spec.Feed.Opts, ca.spec.Feed.Targets); err != nil { + if ca.encoder, err = getEncoder(ca.spec.Feed.Opts, AllTargets(ca.spec.Feed)); err != nil { return nil, err } @@ -345,7 +345,7 @@ func (ca *changeAggregator) makeKVFeedCfg( if schemaChangePolicy == changefeedbase.OptSchemaChangePolicyIgnore { sf = schemafeed.DoNothingSchemaFeed } else { - sf = schemafeed.New(ctx, cfg, schemaChangeEvents, ca.spec.Feed.Targets, + sf = schemafeed.New(ctx, cfg, schemaChangeEvents, AllTargets(ca.spec.Feed), initialHighWater, &ca.metrics.SchemaFeedMetrics) } @@ -358,7 +358,7 @@ func (ca *changeAggregator) makeKVFeedCfg( Gossip: cfg.Gossip, Spans: spans, BackfillCheckpoint: ca.spec.Checkpoint.Spans, - Targets: ca.spec.Feed.Targets, + Targets: AllTargets(ca.spec.Feed), Metrics: &ca.metrics.KVFeedMetrics, OnBackfillCallback: ca.sliMetrics.getBackfillCallback(), MM: ca.kvFeedMemMon, @@ -1085,7 +1085,7 @@ func newChangeFrontierProcessor( cf.freqEmitResolved = emitNoResolved } - if cf.encoder, err = getEncoder(spec.Feed.Opts, spec.Feed.Targets); err != nil { + if cf.encoder, err = getEncoder(spec.Feed.Opts, AllTargets(spec.Feed)); err != nil { return nil, err } @@ -1454,7 +1454,7 @@ func (cf *changeFrontier) manageProtectedTimestamps( recordID := progress.ProtectedTimestampRecord if recordID == uuid.Nil { - ptr := createProtectedTimestampRecord(ctx, cf.flowCtx.Codec(), cf.spec.JobID, cf.spec.Feed.Targets, highWater, progress) + ptr := createProtectedTimestampRecord(ctx, cf.flowCtx.Codec(), cf.spec.JobID, AllTargets(cf.spec.Feed), highWater, progress) if err := pts.Protect(ctx, txn, ptr); err != nil { return err } diff --git a/pkg/ccl/changefeedccl/changefeed_stmt.go b/pkg/ccl/changefeedccl/changefeed_stmt.go index b88eca4886c3..745d350deaa5 100644 --- a/pkg/ccl/changefeedccl/changefeed_stmt.go +++ b/pkg/ccl/changefeedccl/changefeed_stmt.go @@ -181,16 +181,17 @@ func changefeedPlanHook( return err } - targets, err := getTargets(ctx, p, targetDescs, opts) + targets, tables, err := getTargetsAndTables(ctx, p, targetDescs, opts) if err != nil { return err } details := jobspb.ChangefeedDetails{ - Targets: targets, - Opts: opts, - SinkURI: sinkURI, - StatementTime: statementTime, + Tables: tables, + Opts: opts, + SinkURI: sinkURI, + StatementTime: statementTime, + TargetSpecifications: targets, } progress := jobspb.Progress{ Progress: &jobspb.Progress_HighWater{}, @@ -243,7 +244,7 @@ func changefeedPlanHook( return err } - if _, err := getEncoder(details.Opts, details.Targets); err != nil { + if _, err := getEncoder(details.Opts, AllTargets(details)); err != nil { return err } @@ -269,7 +270,7 @@ func changefeedPlanHook( } telemetry.Count(`changefeed.create.sink.` + telemetrySink) telemetry.Count(`changefeed.create.format.` + details.Opts[changefeedbase.OptFormat]) - telemetry.CountBucketed(`changefeed.create.num_tables`, int64(len(targets))) + telemetry.CountBucketed(`changefeed.create.num_tables`, int64(len(tables))) if scope, ok := opts[changefeedbase.OptMetricsScope]; ok { if err := utilccl.CheckEnterpriseEnabled( @@ -332,7 +333,7 @@ func changefeedPlanHook( var protectedTimestampID uuid.UUID codec := p.ExecCfg().Codec if shouldProtectTimestamps(codec) { - ptr = createProtectedTimestampRecord(ctx, codec, jobID, details.Targets, statementTime, progress.GetChangefeed()) + ptr = createProtectedTimestampRecord(ctx, codec, jobID, AllTargets(details), statementTime, progress.GetChangefeed()) protectedTimestampID = ptr.ID.GetUUID() } @@ -470,35 +471,41 @@ func getTableDescriptors( return targetDescs, err } -func getTargets( +func getTargetsAndTables( ctx context.Context, p sql.PlanHookState, targetDescs []catalog.Descriptor, opts map[string]string, -) (jobspb.ChangefeedTargets, error) { - targets := make(jobspb.ChangefeedTargets, len(targetDescs)) +) ([]jobspb.ChangefeedTargetSpecification, jobspb.ChangefeedTargets, error) { + tables := make(jobspb.ChangefeedTargets, len(targetDescs)) + targets := make([]jobspb.ChangefeedTargetSpecification, len(targetDescs)) for _, desc := range targetDescs { if table, isTable := desc.(catalog.TableDescriptor); isTable { if err := p.CheckPrivilege(ctx, desc, privilege.SELECT); err != nil { - return nil, err + return nil, nil, err } _, qualified := opts[changefeedbase.OptFullTableName] name, err := getChangefeedTargetName(ctx, table, p.ExecCfg(), p.ExtendedEvalContext().Txn, qualified) if err != nil { - return nil, err + return nil, nil, err } - targets[table.GetID()] = jobspb.ChangefeedTarget{ + tables[table.GetID()] = jobspb.ChangefeedTargetTable{ StatementTimeName: name, } + ts := jobspb.ChangefeedTargetSpecification{ + Type: jobspb.ChangefeedTargetSpecification_PRIMARY_FAMILY_ONLY, + TableID: table.GetID(), + } + targets = append(targets, ts) if err := changefeedbase.ValidateTable(targets, table); err != nil { - return nil, err + return nil, nil, err } - for _, warning := range changefeedbase.WarningsForTable(targets, table, opts) { + for _, warning := range changefeedbase.WarningsForTable(tables, table, opts) { p.BufferClientNotice(ctx, pgnotice.Newf("%s", warning)) } } } - return targets, nil + return targets, tables, nil } func validateSink( @@ -953,3 +960,28 @@ func getChangefeedTargetName( } return desc.GetName(), nil } + +// AllTargets gets all the targets listed in a ChangefeedDetails, +// from the statement time name map in old protos +// or the TargetSpecifications in new ones. +func AllTargets(cd jobspb.ChangefeedDetails) (targets []jobspb.ChangefeedTargetSpecification) { + //TODO: Use a version gate for this once we have CDC version gates + if len(cd.TargetSpecifications) > 0 { + for _, ts := range cd.TargetSpecifications { + if ts.TableID > 0 { + ts.StatementTimeName = cd.Tables[ts.TableID].StatementTimeName + targets = append(targets, ts) + } + } + } else { + for id, t := range cd.Tables { + ct := jobspb.ChangefeedTargetSpecification{ + Type: jobspb.ChangefeedTargetSpecification_PRIMARY_FAMILY_ONLY, + TableID: id, + StatementTimeName: t.StatementTimeName, + } + targets = append(targets, ct) + } + } + return +} diff --git a/pkg/ccl/changefeedccl/changefeedbase/validate.go b/pkg/ccl/changefeedccl/changefeedbase/validate.go index 3027919bf8a0..056cab49ed66 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/validate.go +++ b/pkg/ccl/changefeedccl/changefeedbase/validate.go @@ -15,9 +15,19 @@ import ( ) // ValidateTable validates that a table descriptor can be watched by a CHANGEFEED. -func ValidateTable(targets jobspb.ChangefeedTargets, tableDesc catalog.TableDescriptor) error { - t, ok := targets[tableDesc.GetID()] - if !ok { +func ValidateTable( + targets []jobspb.ChangefeedTargetSpecification, tableDesc catalog.TableDescriptor, +) error { + var t jobspb.ChangefeedTargetSpecification + var found bool + for _, cts := range targets { + if cts.TableID == tableDesc.GetID() { + t = cts + found = true + break + } + } + if !found { return errors.Errorf(`unwatched table: %s`, tableDesc.GetName()) } diff --git a/pkg/ccl/changefeedccl/encoder.go b/pkg/ccl/changefeedccl/encoder.go index e619f98f575a..7aef94121f47 100644 --- a/pkg/ccl/changefeedccl/encoder.go +++ b/pkg/ccl/changefeedccl/encoder.go @@ -82,7 +82,9 @@ type Encoder interface { EncodeResolvedTimestamp(context.Context, string, hlc.Timestamp) ([]byte, error) } -func getEncoder(opts map[string]string, targets jobspb.ChangefeedTargets) (Encoder, error) { +func getEncoder( + opts map[string]string, targets []jobspb.ChangefeedTargetSpecification, +) (Encoder, error) { switch changefeedbase.FormatType(opts[changefeedbase.OptFormat]) { case ``, changefeedbase.OptFormatJSON: return makeJSONEncoder(opts, targets) @@ -102,7 +104,7 @@ func getEncoder(opts map[string]string, targets jobspb.ChangefeedTargets) (Encod type jsonEncoder struct { updatedField, mvccTimestampField, beforeField, wrapped, keyOnly, keyInValue, topicInValue bool - targets jobspb.ChangefeedTargets + targets []jobspb.ChangefeedTargetSpecification alloc tree.DatumAlloc buf bytes.Buffer virtualColumnVisibility string @@ -111,7 +113,7 @@ type jsonEncoder struct { var _ Encoder = &jsonEncoder{} func makeJSONEncoder( - opts map[string]string, targets jobspb.ChangefeedTargets, + opts map[string]string, targets []jobspb.ChangefeedTargetSpecification, ) (*jsonEncoder, error) { e := &jsonEncoder{ targets: targets, @@ -184,12 +186,13 @@ func (e *jsonEncoder) encodeKeyRaw(row encodeRow) ([]interface{}, error) { func (e *jsonEncoder) encodeTopicRaw(row encodeRow) (interface{}, error) { descID := row.tableDesc.GetID() // use the target list since row.tableDesc.GetName() will not have fully qualified names - topicName, ok := e.targets[descID] - if !ok { - return nil, fmt.Errorf("table with name %s and descriptor ID %d not found in changefeed target list", - row.tableDesc.GetName(), descID) + for _, topic := range e.targets { + if topic.TableID == descID { + return topic.StatementTimeName, nil + } } - return topicName.StatementTimeName, nil + return nil, fmt.Errorf("table with name %s and descriptor ID %d not found in changefeed target list", + row.tableDesc.GetName(), descID) } // EncodeValue implements the Encoder interface. @@ -328,8 +331,8 @@ type confluentAvroEncoder struct { schemaRegistry schemaRegistry schemaPrefix string updatedField, beforeField, keyOnly bool - targets jobspb.ChangefeedTargets virtualColumnVisibility string + targets []jobspb.ChangefeedTargetSpecification keyCache *cache.UnorderedCache // [tableIDAndVersion]confluentRegisteredKeySchema valueCache *cache.UnorderedCache // [tableIDAndVersionPair]confluentRegisteredEnvelopeSchema @@ -367,7 +370,7 @@ var encoderCacheConfig = cache.Config{ } func newConfluentAvroEncoder( - opts map[string]string, targets jobspb.ChangefeedTargets, + opts map[string]string, targets []jobspb.ChangefeedTargetSpecification, ) (*confluentAvroEncoder, error) { e := &confluentAvroEncoder{ schemaPrefix: opts[changefeedbase.OptAvroSchemaPrefix], @@ -422,7 +425,13 @@ func newConfluentAvroEncoder( // Get the raw SQL-formatted string for a table name // and apply full_table_name and avro_schema_prefix options func (e *confluentAvroEncoder) rawTableName(desc catalog.TableDescriptor) string { - return e.schemaPrefix + e.targets[desc.GetID()].StatementTimeName + for _, spec := range e.targets { + if spec.TableID == desc.GetID() { + return e.schemaPrefix + spec.StatementTimeName + } + } + //TODO (zinger): error here + return desc.GetName() } // EncodeKey implements the Encoder interface. diff --git a/pkg/ccl/changefeedccl/encoder_test.go b/pkg/ccl/changefeedccl/encoder_test.go index c85c34bb6237..69b0fb8407b2 100644 --- a/pkg/ccl/changefeedccl/encoder_test.go +++ b/pkg/ccl/changefeedccl/encoder_test.go @@ -209,11 +209,12 @@ func TestEncoders(t *testing.T) { t.Fatalf(`unknown format: %s`, o[changefeedbase.OptFormat]) } - target := jobspb.ChangefeedTarget{ + target := jobspb.ChangefeedTargetSpecification{ + Type: jobspb.ChangefeedTargetSpecification_PRIMARY_FAMILY_ONLY, + TableID: tableDesc.GetID(), StatementTimeName: tableDesc.GetName(), } - targets := jobspb.ChangefeedTargets{} - targets[tableDesc.GetID()] = target + targets := []jobspb.ChangefeedTargetSpecification{target} e, err := getEncoder(o, targets) if len(expected.err) > 0 { @@ -360,11 +361,12 @@ func TestAvroEncoderWithTLS(t *testing.T) { return string(avroToJSON(t, reg, r)) } - target := jobspb.ChangefeedTarget{ + target := jobspb.ChangefeedTargetSpecification{ + Type: jobspb.ChangefeedTargetSpecification_PRIMARY_FAMILY_ONLY, + TableID: tableDesc.GetID(), StatementTimeName: tableDesc.GetName(), } - targets := jobspb.ChangefeedTargets{} - targets[tableDesc.GetID()] = target + targets := []jobspb.ChangefeedTargetSpecification{target} e, err := getEncoder(opts, targets) require.NoError(t, err) diff --git a/pkg/ccl/changefeedccl/kvfeed/kv_feed.go b/pkg/ccl/changefeedccl/kvfeed/kv_feed.go index 96af723b2c4b..55751564d995 100644 --- a/pkg/ccl/changefeedccl/kvfeed/kv_feed.go +++ b/pkg/ccl/changefeedccl/kvfeed/kv_feed.go @@ -43,7 +43,7 @@ type Config struct { Gossip gossip.OptionalGossip Spans []roachpb.Span BackfillCheckpoint []roachpb.Span - Targets jobspb.ChangefeedTargets + Targets []jobspb.ChangefeedTargetSpecification Writer kvevent.Writer Metrics *kvevent.Metrics OnBackfillCallback func() func() diff --git a/pkg/ccl/changefeedccl/schemafeed/schema_feed.go b/pkg/ccl/changefeedccl/schemafeed/schema_feed.go index 70fb4ee5f90b..2761e653b6f0 100644 --- a/pkg/ccl/changefeedccl/schemafeed/schema_feed.go +++ b/pkg/ccl/changefeedccl/schemafeed/schema_feed.go @@ -78,7 +78,7 @@ func New( ctx context.Context, cfg *execinfra.ServerConfig, events changefeedbase.SchemaChangeEventClass, - targets jobspb.ChangefeedTargets, + targets []jobspb.ChangefeedTargetSpecification, initialHighwater hlc.Timestamp, metrics *Metrics, ) SchemaFeed { @@ -114,7 +114,7 @@ type schemaFeed struct { db *kv.DB clock *hlc.Clock settings *cluster.Settings - targets jobspb.ChangefeedTargets + targets []jobspb.ChangefeedTargetSpecification ie sqlutil.InternalExecutor metrics *Metrics @@ -278,10 +278,10 @@ func (tf *schemaFeed) primeInitialTableDescs(ctx context.Context) error { return err } // Note that all targets are currently guaranteed to be tables. - for tableID := range tf.targets { + for _, table := range tf.targets { flags := tree.ObjectLookupFlagsWithRequired() flags.AvoidLeased = true - tableDesc, err := descriptors.GetImmutableTableByID(ctx, txn, tableID, flags) + tableDesc, err := descriptors.GetImmutableTableByID(ctx, txn, table.TableID, flags) if err != nil { return err } @@ -636,8 +636,15 @@ func (tf *schemaFeed) fetchDescriptorVersions( if err != nil { return err } - - origName, isTable := tf.targets[descpb.ID(id)] + var origName string + var isTable bool + for _, cts := range tf.targets { + if cts.TableID == descpb.ID(id) { + origName = cts.StatementTimeName + isTable = true + break + } + } isType := tf.mu.typeDeps.containsType(descpb.ID(id)) // Check if the descriptor is an interesting table or type. if !(isTable || isType) { @@ -647,7 +654,7 @@ func (tf *schemaFeed) fetchDescriptorVersions( unsafeValue := it.UnsafeValue() if unsafeValue == nil { - name := origName.StatementTimeName + name := origName if name == "" { name = fmt.Sprintf("desc(%d)", id) } diff --git a/pkg/ccl/changefeedccl/sink.go b/pkg/ccl/changefeedccl/sink.go index b84e7f3e7bc2..5aea47105877 100644 --- a/pkg/ccl/changefeedccl/sink.go +++ b/pkg/ccl/changefeedccl/sink.go @@ -109,7 +109,7 @@ func getSink( return makeNullSink(sinkURL{URL: u}, m) case u.Scheme == changefeedbase.SinkSchemeKafka: return validateOptionsAndMakeSink(changefeedbase.KafkaValidOptions, func() (Sink, error) { - return makeKafkaSink(ctx, sinkURL{URL: u}, feedCfg.Targets, feedCfg.Opts, m) + return makeKafkaSink(ctx, sinkURL{URL: u}, AllTargets(feedCfg), feedCfg.Opts, m) }) case isWebhookSink(u): return validateOptionsAndMakeSink(changefeedbase.WebhookValidOptions, func() (Sink, error) { @@ -119,7 +119,7 @@ func getSink( case isPubsubSink(u): // TODO: add metrics to pubsubsink return validateOptionsAndMakeSink(changefeedbase.PubsubValidOptions, func() (Sink, error) { - return MakePubsubSink(ctx, u, feedCfg.Opts, feedCfg.Targets) + return MakePubsubSink(ctx, u, feedCfg.Opts, AllTargets(feedCfg)) }) case isCloudStorageSink(u): return validateOptionsAndMakeSink(changefeedbase.CloudStorageValidOptions, func() (Sink, error) { @@ -130,7 +130,7 @@ func getSink( }) case u.Scheme == changefeedbase.SinkSchemeExperimentalSQL: return validateOptionsAndMakeSink(changefeedbase.SQLValidOptions, func() (Sink, error) { - return makeSQLSink(sinkURL{URL: u}, sqlSinkTableName, feedCfg.Targets, m) + return makeSQLSink(sinkURL{URL: u}, sqlSinkTableName, AllTargets(feedCfg), m) }) case u.Scheme == "": return nil, errors.Errorf(`no scheme found for sink URL %q`, feedCfg.SinkURI) diff --git a/pkg/ccl/changefeedccl/sink_cloudstorage_test.go b/pkg/ccl/changefeedccl/sink_cloudstorage_test.go index a9a6b2a0350a..64ea1e83d756 100644 --- a/pkg/ccl/changefeedccl/sink_cloudstorage_test.go +++ b/pkg/ccl/changefeedccl/sink_cloudstorage_test.go @@ -145,7 +145,7 @@ func TestCloudStorageSink(t *testing.T) { changefeedbase.OptCompression: ``, // NB: overridden in single-node subtest. } ts := func(i int64) hlc.Timestamp { return hlc.Timestamp{WallTime: i} } - e, err := makeJSONEncoder(opts, jobspb.ChangefeedTargets{}) + e, err := makeJSONEncoder(opts, []jobspb.ChangefeedTargetSpecification{}) require.NoError(t, err) clientFactory := blobs.TestBlobServiceClient(settings.ExternalIODir) diff --git a/pkg/ccl/changefeedccl/sink_kafka.go b/pkg/ccl/changefeedccl/sink_kafka.go index 0e346e5595fe..273578cfcba4 100644 --- a/pkg/ccl/changefeedccl/sink_kafka.go +++ b/pkg/ccl/changefeedccl/sink_kafka.go @@ -415,18 +415,18 @@ func (p *changefeedPartitioner) Partition( } func makeTopicsMap( - prefix string, name string, targets jobspb.ChangefeedTargets, + prefix string, name string, targets []jobspb.ChangefeedTargetSpecification, ) map[descpb.ID]string { topics := make(map[descpb.ID]string) useSingleName := name != "" if useSingleName { name = prefix + SQLNameToKafkaName(name) } - for id, t := range targets { + for _, t := range targets { if useSingleName { - topics[id] = name + topics[t.TableID] = name } else { - topics[id] = prefix + SQLNameToKafkaName(t.StatementTimeName) + topics[t.TableID] = prefix + SQLNameToKafkaName(t.StatementTimeName) } } return topics @@ -649,7 +649,7 @@ func buildKafkaConfig(u sinkURL, opts map[string]string) (*sarama.Config, error) func makeKafkaSink( ctx context.Context, u sinkURL, - targets jobspb.ChangefeedTargets, + targets []jobspb.ChangefeedTargetSpecification, opts map[string]string, m *sliMetrics, ) (Sink, error) { diff --git a/pkg/ccl/changefeedccl/sink_pubsub.go b/pkg/ccl/changefeedccl/sink_pubsub.go index 0fa5f912f41f..110e721b7abc 100644 --- a/pkg/ccl/changefeedccl/sink_pubsub.go +++ b/pkg/ccl/changefeedccl/sink_pubsub.go @@ -139,7 +139,10 @@ func getGCPCredentials(ctx context.Context, u sinkURL) (*google.Credentials, err // MakePubsubSink returns the corresponding pubsub sink based on the url given func MakePubsubSink( - ctx context.Context, u *url.URL, opts map[string]string, targets jobspb.ChangefeedTargets, + ctx context.Context, + u *url.URL, + opts map[string]string, + targets []jobspb.ChangefeedTargetSpecification, ) (Sink, error) { pubsubURL := sinkURL{URL: u, q: u.Query()} @@ -294,19 +297,19 @@ func (p *gcpPubsubClient) getTopicClient(topicID descpb.ID) (*pubsub.Topic, erro } func (p *pubsubSink) getTopicsMap( - targets jobspb.ChangefeedTargets, pubsubTopicName string, + targets []jobspb.ChangefeedTargetSpecification, pubsubTopicName string, ) map[descpb.ID]*topicStruct { topics := make(map[descpb.ID]*topicStruct) //creates a topic for each target - for id, target := range targets { + for _, target := range targets { var topicName string if pubsubTopicName != "" { topicName = pubsubTopicName } else { topicName = target.StatementTimeName } - topics[id] = &topicStruct{topicName: topicName} + topics[target.TableID] = &topicStruct{topicName: topicName} } return topics } diff --git a/pkg/ccl/changefeedccl/sink_sql.go b/pkg/ccl/changefeedccl/sink_sql.go index e10fff92251e..9a54dbee9989 100644 --- a/pkg/ccl/changefeedccl/sink_sql.go +++ b/pkg/ccl/changefeedccl/sink_sql.go @@ -72,7 +72,7 @@ type sqlSink struct { const sqlSinkTableName = `sqlsink` func makeSQLSink( - u sinkURL, tableName string, targets jobspb.ChangefeedTargets, m *sliMetrics, + u sinkURL, tableName string, targets []jobspb.ChangefeedTargetSpecification, m *sliMetrics, ) (Sink, error) { // Swap the changefeed prefix for the sql connection one that sqlSink // expects. @@ -84,9 +84,9 @@ func makeSQLSink( topics := make(map[string]struct{}) targetNames := make(map[descpb.ID]string) - for id, t := range targets { + for _, t := range targets { topics[t.StatementTimeName] = struct{}{} - targetNames[id] = t.StatementTimeName + targetNames[t.TableID] = t.StatementTimeName } uri := u.String() diff --git a/pkg/ccl/changefeedccl/sink_test.go b/pkg/ccl/changefeedccl/sink_test.go index 9a441e685bc6..1c6eb2059e9e 100644 --- a/pkg/ccl/changefeedccl/sink_test.go +++ b/pkg/ccl/changefeedccl/sink_test.go @@ -203,10 +203,13 @@ func makeTestKafkaSink( } } -func makeChangefeedTargets(targetNames ...string) jobspb.ChangefeedTargets { - targets := make(jobspb.ChangefeedTargets, len(targetNames)) +func makeChangefeedTargets(targetNames ...string) []jobspb.ChangefeedTargetSpecification { + targets := make([]jobspb.ChangefeedTargetSpecification, len(targetNames)) for i, name := range targetNames { - targets[descpb.ID(i)] = jobspb.ChangefeedTarget{StatementTimeName: name} + targets[i] = jobspb.ChangefeedTargetSpecification{ + TableID: descpb.ID(i), + StatementTimeName: name, + } } return targets } @@ -402,9 +405,9 @@ func TestSQLSink(t *testing.T) { fooTopic := overrideTopic(`foo`) barTopic := overrideTopic(`bar`) - targets := jobspb.ChangefeedTargets{ - fooTopic.GetID(): jobspb.ChangefeedTarget{StatementTimeName: `foo`}, - barTopic.GetID(): jobspb.ChangefeedTarget{StatementTimeName: `bar`}, + targets := []jobspb.ChangefeedTargetSpecification{ + {TableID: fooTopic.GetID(), StatementTimeName: `foo`}, + {TableID: barTopic.GetID(), StatementTimeName: `bar`}, } const testTableName = `sink` sink, err := makeSQLSink(sinkURL{URL: &pgURL}, testTableName, targets, nil) diff --git a/pkg/ccl/changefeedccl/sink_webhook_test.go b/pkg/ccl/changefeedccl/sink_webhook_test.go index 98a0a396f164..cbb9690edc38 100644 --- a/pkg/ccl/changefeedccl/sink_webhook_test.go +++ b/pkg/ccl/changefeedccl/sink_webhook_test.go @@ -99,7 +99,7 @@ func testSendAndReceiveRows(t *testing.T, sinkSrc Sink, sinkDest *cdctest.MockWe "sink %s expected to receive message %s", sinkDest.URL(), "{\"payload\":[{\"after\":null,\"key\":[1002],\"topic:\":\"foo\"}],\"length\":1}") - enc, err := makeJSONEncoder(getGenericWebhookSinkOptions(), jobspb.ChangefeedTargets{}) + enc, err := makeJSONEncoder(getGenericWebhookSinkOptions(), []jobspb.ChangefeedTargetSpecification{}) require.NoError(t, err) // test a resolved timestamp entry diff --git a/pkg/ccl/streamingccl/streamproducer/replication_stream_planning.go b/pkg/ccl/streamingccl/streamproducer/replication_stream_planning.go index 5217583677d0..16503e269810 100644 --- a/pkg/ccl/streamingccl/streamproducer/replication_stream_planning.go +++ b/pkg/ccl/streamingccl/streamproducer/replication_stream_planning.go @@ -93,7 +93,7 @@ func streamKVs( } details := jobspb.ChangefeedDetails{ - Targets: nil, // Not interested in schema changes + Tables: nil, // Not interested in schema changes Opts: cfOpts, SinkURI: "", // TODO(yevgeniy): Support sinks StatementTime: statementTime, diff --git a/pkg/jobs/jobspb/jobs.proto b/pkg/jobs/jobspb/jobs.proto index cad1b66d0fa5..1f95670ecee6 100644 --- a/pkg/jobs/jobspb/jobs.proto +++ b/pkg/jobs/jobspb/jobs.proto @@ -692,31 +692,42 @@ message SchemaChangeGCProgress { bool ranges_unsplit_done = 4; } -message ChangefeedTarget { +message ChangefeedTargetTable { string statement_time_name = 1; +} + +message ChangefeedTargetSpecification { + enum TargetType { + // The primary index of the table with table_id descriptor id. + // Fail if there are ever multiple column families. + PRIMARY_FAMILY_ONLY = 0; + + // The primary index of the table with table_id descriptor id. + // Each column family gets its own record schema and events. + EACH_FAMILY = 1; + + // Column family family_name of table table_id. + COLUMN_FAMILY = 2; + + // Add TargetTypes for database, secondary index, etc. when implemented + + } + + TargetType type = 1; + uint32 table_id = 2 [(gogoproto.customname) = "TableID", + (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb.ID"]; + string family_name = 3; + string statement_time_name = 4; - // TODO(dan): Add partition name, ranges of primary keys. } message ChangefeedDetails { - // Targets contains the user-specified tables and databases to watch, mapping - // the descriptor id to the name at the time of changefeed creating. There is - // a 1:1 correspondence between unique targets in the original sql query and - // entries in this map. - // - // - A watched table is stored here under its table id - // - TODO(dan): A watched database is stored here under its database id - // - TODO(dan): A db.* expansion is treated identically to watching the - // database - // - // Note that the TODOs mean this field currently is guaranteed to only hold - // table ids and a cluster version check will be added when this changes. - // + // Targets contains the user-specified tables to watch, mapping + // the descriptor id to the name at the time of changefeed creation. // The names at resolution time are included so that table and database - // renames can be detected. They are also used to construct an error message - // if the descriptor id no longer exists when the jobs is unpaused (which can - // happen if it was dropped or truncated). - map targets = 6 [ + // renames can be tolerated and derived topic names remain immutable. + // + map tables = 6 [ (gogoproto.castkey) = "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb.ID", (gogoproto.casttype) = "ChangefeedTargets", (gogoproto.nullable) = false @@ -724,8 +735,10 @@ message ChangefeedDetails { string sink_uri = 3 [(gogoproto.customname) = "SinkURI"]; map opts = 4; util.hlc.Timestamp statement_time = 7 [(gogoproto.nullable) = false]; + repeated ChangefeedTargetSpecification target_specifications = 8 [(gogoproto.nullable) = false]; reserved 1, 2, 5; + reserved "targets"; } message ResolvedSpan { diff --git a/pkg/jobs/jobspb/wrap.go b/pkg/jobs/jobspb/wrap.go index 24bfea0f345f..2e977e22c1ab 100644 --- a/pkg/jobs/jobspb/wrap.go +++ b/pkg/jobs/jobspb/wrap.go @@ -305,7 +305,7 @@ func WrapPayloadDetails(details Details) interface { } // ChangefeedTargets is a set of id targets with metadata. -type ChangefeedTargets map[descpb.ID]ChangefeedTarget +type ChangefeedTargets map[descpb.ID]ChangefeedTargetTable // SchemaChangeDetailsFormatVersion is the format version for // SchemaChangeDetails. diff --git a/pkg/ui/workspaces/cluster-ui/src/icon/circleFilled.tsx b/pkg/ui/workspaces/cluster-ui/src/icon/circleFilled.tsx index 770f2974e995..6993a159539d 100644 --- a/pkg/ui/workspaces/cluster-ui/src/icon/circleFilled.tsx +++ b/pkg/ui/workspaces/cluster-ui/src/icon/circleFilled.tsx @@ -14,8 +14,11 @@ interface IconProps { className: string; } -export const CircleFilled = ({ className, ...props }: IconProps) => ( - - - -); +export function CircleFilled(props: IconProps) { + const { className } = props; + return ( + + + + ); +} diff --git a/pkg/ui/workspaces/cluster-ui/src/queryFilter/filter.tsx b/pkg/ui/workspaces/cluster-ui/src/queryFilter/filter.tsx index 25bfa067fddd..d6237b7ec717 100644 --- a/pkg/ui/workspaces/cluster-ui/src/queryFilter/filter.tsx +++ b/pkg/ui/workspaces/cluster-ui/src/queryFilter/filter.tsx @@ -43,6 +43,7 @@ interface QueryFilter { showScan?: boolean; showRegions?: boolean; showNodes?: boolean; + timeLabel?: string; } interface FilterState { hide: boolean; @@ -69,6 +70,7 @@ export interface Filters { const timeUnit = [ { label: "seconds", value: "seconds" }, { label: "milliseconds", value: "milliseconds" }, + { label: "minutes", value: "minutes" }, ]; export const defaultFilters: Filters = { @@ -239,9 +241,15 @@ export const calculateActiveFilters = (filters: Filters): number => { export const getTimeValueInSeconds = (filters: Filters): number | "empty" => { if (filters.timeNumber === "0") return "empty"; - return filters.timeUnit === "seconds" - ? Number(filters.timeNumber) - : Number(filters.timeNumber) / 1000; + switch (filters.timeUnit) { + case "seconds": + return Number(filters.timeNumber); + case "minutes": + return Number(filters.timeNumber) * 60; + default: + // Milliseconds + return Number(filters.timeNumber) / 1000; + } }; export class Filter extends React.Component { @@ -355,6 +363,7 @@ export class Filter extends React.Component { showScan, showRegions, showNodes, + timeLabel, } = this.props; const dropdownArea = hide ? hidden : dropdown; const customStyles = { @@ -549,7 +558,7 @@ export class Filter extends React.Component { {showRegions ? regionsFilter : ""} {showNodes ? nodesFilter : ""}
- Statement fingerprint runs longer than + {timeLabel ? timeLabel : "Statement fingerprint runs longer than"}
{ - @@ -293,6 +289,11 @@ export class SessionDetails extends React.Component { value={yesOrNo(txn.is_historical)} className={cx("details-item")} /> + { value={TimestampToMoment(stmt.start).format(DATE_FORMAT)} className={cx("details-item")} /> - - this.props.onStatementClick && this.props.onStatementClick() + + + this.props.onStatementClick && + this.props.onStatementClick() + } + > + View Statement Details + + } - > - View Statement Details - + value={""} + className={cx("details-item")} + /> @@ -363,10 +373,12 @@ export class SessionDetails extends React.Component { label={"Gateway Node"} value={ this.props.uiConfig.showGatewayNodeLink ? ( - +
+ +
) : ( session.node_id.toString() ) @@ -374,11 +386,34 @@ export class SessionDetails extends React.Component { className={cx("details-item")} /> )} + + + 0 + ? "session-status-icon__active" + : "session-status-icon__idle", + )} + /> + + {session.active_queries.length > 0 ? "Active" : "Idle"} + + + } + className={cx("details-item")} + /> @@ -386,6 +421,11 @@ export class SessionDetails extends React.Component { alloc_bytes={session.alloc_bytes} max_alloc_bytes={session.max_alloc_bytes} /> + @@ -394,7 +434,7 @@ export class SessionDetails extends React.Component { {txnInfo} - Statement + Most Recent Statement {curStmtInfo} diff --git a/pkg/ui/workspaces/cluster-ui/src/sessions/sessionPage.module.scss b/pkg/ui/workspaces/cluster-ui/src/sessions/sessionPage.module.scss index 6fe0b635cc73..3fd710c522ed 100644 --- a/pkg/ui/workspaces/cluster-ui/src/sessions/sessionPage.module.scss +++ b/pkg/ui/workspaces/cluster-ui/src/sessions/sessionPage.module.scss @@ -15,3 +15,12 @@ all: initial; font-family: $font-family--base; } + +.sessions-filter { + font-size: $font-size--medium; + margin-bottom: $spacing-smaller; +} + +.session-column-selector { + margin-bottom: $spacing-smaller; +} diff --git a/pkg/ui/workspaces/cluster-ui/src/sessions/sessionsPage.fixture.ts b/pkg/ui/workspaces/cluster-ui/src/sessions/sessionsPage.fixture.ts index 96591fdf23f2..c9f3c9b3010e 100644 --- a/pkg/ui/workspaces/cluster-ui/src/sessions/sessionsPage.fixture.ts +++ b/pkg/ui/workspaces/cluster-ui/src/sessions/sessionsPage.fixture.ts @@ -15,6 +15,7 @@ import Long from "long"; import { cockroach } from "@cockroachlabs/crdb-protobuf-client"; const Phase = cockroach.server.serverpb.ActiveQuery.Phase; import { util } from "protobufjs"; +import { defaultFilters, Filters } from "../queryFilter"; import { CancelQueryRequestMessage, CancelSessionRequestMessage, @@ -142,7 +143,16 @@ const sessionsList: SessionInfo[] = [ activeSession, ]; +export const filters: Filters = { + app: "", + timeNumber: "0", + timeUnit: "seconds", + regions: "", + nodes: "", +}; + export const sessionsPagePropsFixture: SessionsPageProps = { + filters: defaultFilters, history, location: { pathname: "/sessions", @@ -162,6 +172,8 @@ export const sessionsPagePropsFixture: SessionsPageProps = { ascending: false, columnTitle: "statementAge", }, + columns: null, + internalAppNamePrefix: "$ internal", refreshSessions: () => {}, cancelSession: (req: CancelSessionRequestMessage) => {}, cancelQuery: (req: CancelQueryRequestMessage) => {}, @@ -169,6 +181,7 @@ export const sessionsPagePropsFixture: SessionsPageProps = { }; export const sessionsPagePropsEmptyFixture: SessionsPageProps = { + filters: defaultFilters, history, location: { pathname: "/sessions", @@ -188,6 +201,8 @@ export const sessionsPagePropsEmptyFixture: SessionsPageProps = { ascending: false, columnTitle: "statementAge", }, + columns: null, + internalAppNamePrefix: "$ internal", refreshSessions: () => {}, cancelSession: (req: CancelSessionRequestMessage) => {}, cancelQuery: (req: CancelQueryRequestMessage) => {}, diff --git a/pkg/ui/workspaces/cluster-ui/src/sessions/sessionsPage.tsx b/pkg/ui/workspaces/cluster-ui/src/sessions/sessionsPage.tsx index 18707c4f4600..3850d1a20687 100644 --- a/pkg/ui/workspaces/cluster-ui/src/sessions/sessionsPage.tsx +++ b/pkg/ui/workspaces/cluster-ui/src/sessions/sessionsPage.tsx @@ -9,10 +9,9 @@ // licenses/APL.txt. import React from "react"; -import { isNil } from "lodash"; +import { isNil, merge } from "lodash"; import { syncHistory } from "src/util/query"; -import { appAttr } from "src/util/constants"; import { makeSessionsColumns, SessionInfo, @@ -24,16 +23,24 @@ import { sessionsTable } from "src/util/docs"; import emptyTableResultsIcon from "../assets/emptyState/empty-table-results.svg"; import SQLActivityError from "../sqlActivity/errorComponent"; - -import { Pagination, ResultsPerPageLabel } from "src/pagination"; +import { Pagination } from "src/pagination"; import { SortSetting, ISortedTablePagination, updateSortSettingQueryParamsOnTab, + ColumnDescriptor, } from "src/sortedtable"; import { Loading } from "src/loading"; import { Anchor } from "src/anchor"; import { EmptyTable } from "src/empty"; +import { + calculateActiveFilters, + defaultFilters, + Filter, + Filters, + getTimeValueInSeconds, + handleFiltersFromQueryString, +} from "../queryFilter"; import TerminateQueryModal, { TerminateQueryModalRef, @@ -49,12 +56,26 @@ import { import statementsPageStyles from "src/statementsPage/statementsPage.module.scss"; import sessionPageStyles from "./sessionPage.module.scss"; +import ColumnsSelector, { + SelectOption, +} from "../columnsSelector/columnsSelector"; +import { TimestampToMoment } from "src/util"; +import moment from "moment"; +import { + getLabel, + StatisticTableColumnKeys, +} from "../statsTableUtil/statsTableUtil"; +import { TableStatistics } from "../tableStatistics"; +import * as protos from "@cockroachlabs/crdb-protobuf-client"; + +type ISessionsResponse = protos.cockroach.server.serverpb.IListSessionsResponse; const statementsPageCx = classNames.bind(statementsPageStyles); const sessionsPageCx = classNames.bind(sessionPageStyles); export interface OwnProps { sessions: SessionInfo[]; + internalAppNamePrefix: string; sessionsError: Error | Error[]; sortSetting: SortSetting; refreshSessions: () => void; @@ -69,13 +90,30 @@ export interface OwnProps { onSessionClick?: () => void; onTerminateSessionClick?: () => void; onTerminateStatementClick?: () => void; + onColumnsChange?: (selectedColumns: string[]) => void; + onFilterChange?: (value: Filters) => void; + columns: string[]; + filters: Filters; } export interface SessionsPageState { + apps: string[]; pagination: ISortedTablePagination; + filters: Filters; + activeFilters?: number; } -export type SessionsPageProps = OwnProps & RouteComponentProps; +export type SessionsPageProps = OwnProps & RouteComponentProps; + +function getSessionAppFilterOptions(sessions: SessionInfo[]): string[] { + const uniqueAppNames = new Set( + sessions.map(s => + s.session.application_name ? s.session.application_name : "(unset)", + ), + ); + + return Array.from(uniqueAppNames); +} export class SessionsPage extends React.Component< SessionsPageProps, @@ -87,11 +125,16 @@ export class SessionsPage extends React.Component< constructor(props: SessionsPageProps) { super(props); this.state = { + filters: defaultFilters, + apps: [], pagination: { pageSize: 20, current: 1, }, }; + + const stateFromHistory = this.getStateFromHistory(); + this.state = merge(this.state, stateFromHistory); this.terminateSessionRef = React.createRef(); this.terminateQueryRef = React.createRef(); @@ -111,6 +154,21 @@ export class SessionsPage extends React.Component< } } + getStateFromHistory = (): Partial => { + const { history, filters, onFilterChange } = this.props; + + // Filters. + const latestFilter = handleFiltersFromQueryString( + history, + filters, + onFilterChange, + ); + + return { + filters: latestFilter, + }; + }; + changeSortSetting = (ss: SortSetting): void => { if (this.props.onSortingChange) { this.props.onSortingChange("Sessions", ss.columnTitle, ss.ascending); @@ -161,35 +219,169 @@ export class SessionsPage extends React.Component< this.props.onPageChanged(current); }; + onSubmitFilters = (filters: Filters): void => { + if (this.props.onFilterChange) { + this.props.onFilterChange(filters); + } + + this.setState({ + filters: filters, + }); + this.resetPagination(); + syncHistory( + { + app: filters.app, + timeNumber: filters.timeNumber, + timeUnit: filters.timeUnit, + }, + this.props.history, + ); + }; + + onClearFilters = (): void => { + if (this.props.onFilterChange) { + this.props.onFilterChange(defaultFilters); + } + + this.setState({ + filters: { + ...defaultFilters, + }, + }); + this.resetPagination(); + syncHistory( + { + app: undefined, + timeNumber: undefined, + timeUnit: undefined, + }, + this.props.history, + ); + }; + + getFilteredSessionsData = (): { + sessions: SessionInfo[]; + activeFilters: number; + } => { + const { filters } = this.state; + const { sessions, internalAppNamePrefix } = this.props; + if (!filters) { + return { + sessions: sessions, + activeFilters: 0, + }; + } + const activeFilters = calculateActiveFilters(filters); + const timeValue = getTimeValueInSeconds(filters); + const filteredSessions = sessions + .filter((s: SessionInfo) => { + const isInternal = (s: SessionInfo) => + s.session.application_name.startsWith(internalAppNamePrefix); + if (filters.app && filters.app != "All") { + const apps = filters.app.split(","); + let showInternal = false; + if (apps.includes(internalAppNamePrefix)) { + showInternal = true; + } + if (apps.includes("(unset)")) { + apps.push(""); + } + + return ( + (showInternal && isInternal(s)) || + apps.includes(s.session.application_name) + ); + } else { + return !isInternal(s); + } + }) + .filter((s: SessionInfo) => { + const sessionTime = moment().diff( + TimestampToMoment(s.session.start), + "seconds", + ); + return sessionTime >= timeValue || timeValue === "empty"; + }); + + return { + sessions: filteredSessions, + activeFilters, + }; + }; + renderSessions = (): React.ReactElement => { const sessionsData = this.props.sessions; - const { pagination } = this.state; + const { pagination, filters } = this.state; + const { columns: userSelectedColumnsToShow, onColumnsChange } = this.props; + + const { + sessions: sessionsToDisplay, + activeFilters, + } = this.getFilteredSessionsData(); + + const appNames = getSessionAppFilterOptions(sessionsData); + const columns = makeSessionsColumns( + "session", + this.terminateSessionRef, + this.terminateQueryRef, + this.props.onSessionClick, + this.props.onTerminateStatementClick, + this.props.onTerminateSessionClick, + ); + + const isColumnSelected = (c: ColumnDescriptor) => { + return ( + (!userSelectedColumnsToShow && c.showByDefault) || + (userSelectedColumnsToShow && + userSelectedColumnsToShow.includes(c.name)) || + c.alwaysShow + ); + }; + + const tableColumns = columns + .filter(c => !c.alwaysShow) + .map( + (c): SelectOption => ({ + label: getLabel(c.name as StatisticTableColumnKeys), + value: c.name, + isSelected: isColumnSelected(c), + }), + ); + + const timeLabel = "Session duration runs longer than"; + const displayColumns = columns.filter(c => isColumnSelected(c)); return ( <> +
+ +
-

- + + -

+
(
{storyFn()}
- )) - .add("with data", () => ); + )); diff --git a/pkg/ui/workspaces/cluster-ui/src/sessions/sessionsPageConnected.tsx b/pkg/ui/workspaces/cluster-ui/src/sessions/sessionsPageConnected.tsx index e0e7ef5205e1..0a2404f04bef 100644 --- a/pkg/ui/workspaces/cluster-ui/src/sessions/sessionsPageConnected.tsx +++ b/pkg/ui/workspaces/cluster-ui/src/sessions/sessionsPageConnected.tsx @@ -14,7 +14,7 @@ import { analyticsActions, AppState } from "src/store"; import { SessionsState } from "src/store/sessions"; import { createSelector } from "reselect"; -import { SessionsPage } from "./index"; +import { OwnProps, SessionsPage } from "./index"; import { actions as sessionsActions } from "src/store/sessions"; import { actions as localStorageActions } from "src/store/localStorage"; @@ -24,6 +24,23 @@ import { ICancelSessionRequest, } from "src/store/terminateQuery"; import { Dispatch } from "redux"; +import { Filters } from "../queryFilter"; +import { sqlStatsSelector } from "../store/sqlStats/sqlStats.selector"; + +export const selectSessionsData = createSelector( + sqlStatsSelector, + sessionsState => (sessionsState.valid ? sessionsState.data : null), +); + +export const adminUISelector = createSelector( + (state: AppState) => state.adminUI, + adminUiState => adminUiState, +); + +export const localStorageSelector = createSelector( + adminUISelector, + adminUiState => adminUiState.localStorage, +); export const selectSessions = createSelector( (state: AppState) => state.adminUI.sessions, @@ -37,17 +54,43 @@ export const selectSessions = createSelector( }, ); +export const selectAppName = createSelector( + (state: AppState) => state.adminUI.sessions, + (state: SessionsState) => { + if (!state.data) { + return null; + } + return state.data.internal_app_name_prefix; + }, +); + export const selectSortSetting = createSelector( (state: AppState) => state.adminUI.localStorage, localStorage => localStorage["sortSetting/SessionsPage"], ); +export const selectColumns = createSelector( + localStorageSelector, + localStorage => + localStorage["showColumns/SessionsPage"] + ? localStorage["showColumns/SessionsPage"].split(",") + : null, +); + +export const selectFilters = createSelector( + localStorageSelector, + localStorage => localStorage["filters/SessionsPage"], +); + export const SessionsPageConnected = withRouter( connect( (state: AppState, props: RouteComponentProps) => ({ sessions: selectSessions(state), + internalAppNamePrefix: selectAppName(state), sessionsError: state.adminUI.sessions.lastError, sortSetting: selectSortSetting(state), + columns: selectColumns(state), + filters: selectFilters(state), }), (dispatch: Dispatch) => ({ refreshSessions: () => dispatch(sessionsActions.refresh()), @@ -95,6 +138,30 @@ export const SessionsPageConnected = withRouter( page: "Sessions", action: "Terminate Statement", }), + onFilterChange: (value: Filters) => { + dispatch( + analyticsActions.track({ + name: "Filter Clicked", + page: "Sessions", + filterName: "app", + value: value.toString(), + }), + ); + dispatch( + localStorageActions.update({ + key: "filters/SessionsPage", + value: value, + }), + ); + }, + onColumnsChange: (selectedColumns: string[]) => + dispatch( + localStorageActions.update({ + key: "showColumns/SessionsPage", + value: + selectedColumns.length === 0 ? " " : selectedColumns.join(","), + }), + ), }), )(SessionsPage), ); diff --git a/pkg/ui/workspaces/cluster-ui/src/sessions/sessionsTable.module.scss b/pkg/ui/workspaces/cluster-ui/src/sessions/sessionsTable.module.scss index 5cedecf0c1ea..acfd426160c3 100644 --- a/pkg/ui/workspaces/cluster-ui/src/sessions/sessionsTable.module.scss +++ b/pkg/ui/workspaces/cluster-ui/src/sessions/sessionsTable.module.scss @@ -23,6 +23,17 @@ color: $colors--link; } +.cl-table__col-query-text { + font-family: $font-family--monospace; + font-size: 12px; + display: inline-block; + max-width: 400px; + div { + font-size: $font-size--small; + @include line-clamp(2); + } +} + .cl-table__col-session { color: $colors--neutral-8; font-family: $font-family--base; @@ -43,6 +54,20 @@ } } +.session-status-icon { + &__active { + height: 10px; + width: 20px; + fill: $colors--primary-green-3; + } + + &__idle { + height: 10px; + width: 20px; + fill: $colors--functional-orange-3; + } +} + .code { font-family: $font-family--monospace; font-size: $font-size--small; diff --git a/pkg/ui/workspaces/cluster-ui/src/sessions/sessionsTable.tsx b/pkg/ui/workspaces/cluster-ui/src/sessions/sessionsTable.tsx index 00b20f595af9..1f814e462b42 100644 --- a/pkg/ui/workspaces/cluster-ui/src/sessions/sessionsTable.tsx +++ b/pkg/ui/workspaces/cluster-ui/src/sessions/sessionsTable.tsx @@ -11,13 +11,12 @@ import classNames from "classnames/bind"; import styles from "./sessionsTable.module.scss"; -import { SessionTableTitle } from "./sessionsTableContent"; import { TimestampToMoment } from "src/util/convert"; -import { BytesWithPrecision, DATE_FORMAT } from "src/util/format"; +import { BytesWithPrecision } from "src/util/format"; import { Link } from "react-router-dom"; import React from "react"; -import { Moment } from "moment"; +import moment from "moment"; import { cockroach } from "@cockroachlabs/crdb-protobuf-client"; type ISession = cockroach.server.serverpb.Session; @@ -26,8 +25,8 @@ import { TerminateSessionModalRef } from "./terminateSessionModal"; import { TerminateQueryModalRef } from "./terminateQueryModal"; import { ColumnDescriptor, SortedTable } from "src/sortedtable/sortedtable"; -import { Icon } from "antd"; -import { Ellipsis } from "@cockroachlabs/icons"; +import { Icon } from "@cockroachlabs/ui-components"; +import { CircleFilled } from "src/icon/circleFilled"; import { Dropdown, @@ -36,6 +35,11 @@ import { import { Button } from "src/button/button"; import { Tooltip } from "@cockroachlabs/ui-components"; import { computeOrUseStmtSummary } from "../util"; +import { StatementLinkTarget } from "../statementsTable"; +import { + statisticsTableTitles, + StatisticType, +} from "../statsTableUtil/statsTableUtil"; const cx = classNames.bind(styles); @@ -61,59 +65,82 @@ const SessionLink = (props: { session: ISession; onClick?: () => void }) => { const { session, onClick } = props; const base = `/session`; - const start = TimestampToMoment(session.start); const sessionID = byteArrayToUuid(session.id); return (
- Session started at {start.format(DATE_FORMAT)}} - > - -
{start.fromNow(true)}
- -
+ +
{formatSessionStart(session)}
+
); }; -const AgeLabel = (props: { start: Moment; thingName: string }) => { +const StatementTableCell = (props: { session: ISession }) => { + const { session } = props; + + if (!(session.active_queries?.length > 0)) { + if (session.last_active_query == "") { + return
{"N/A"}
; + } + return
{session.last_active_query}
; + } + const stmt = session.active_queries[0]; + const sql = stmt.sql; + const stmtSummary = session.active_queries[0].sql_summary; + const stmtCellText = computeOrUseStmtSummary(sql, stmtSummary); return ( - - {props.thingName} started at {props.start.format(DATE_FORMAT)} - - } + - {props.start.fromNow(true)} - + {sql}}> +
{stmtCellText}
+
+ ); }; -const StatementTableCell = (props: { session: ISession }) => { - const { session } = props; +function formatSessionStart(session: ISession): string { + const formatStr = "MMM DD, YYYY [at] h:mm A"; + const start = moment.unix(Number(session.start.seconds)).utc(); - if (!(session.active_queries?.length > 0)) { + return start.format(formatStr); +} + +function formatStatementStart(session: ISession): string { + if (session.active_queries.length == 0) { return "N/A"; } - const stmt = session.active_queries[0].sql; - const stmtSummary = session.active_queries[0].sql_summary; - const stmtCellText = computeOrUseStmtSummary(stmt, stmtSummary); + const formatStr = "MMM DD, YYYY [at] h:mm A"; + const start = moment + .unix(Number(session.active_queries[0].start.seconds)) + .utc(); + + return start.format(formatStr); +} + +const SessionStatus = (props: { session: ISession }) => { + const { session } = props; + const status = session.active_queries.length > 0 ? "Active" : "Idle"; + const classname = + session.active_queries.length > 0 + ? "session-status-icon__active" + : "session-status-icon__idle"; return (
- {stmt}}> - {stmtCellText} - + + {status}
); }; export function makeSessionsColumns( + statType: StatisticType, terminateSessionRef?: React.RefObject, terminateQueryRef?: React.RefObject, onSessionClick?: () => void, @@ -122,51 +149,48 @@ export function makeSessionsColumns( ): ColumnDescriptor[] { const columns: ColumnDescriptor[] = [ { - name: "sessionAge", - title: SessionTableTitle.sessionAge, + name: "sessionStart", + title: statisticsTableTitles.sessionStart(statType), className: cx("cl-table__col-session"), cell: session => SessionLink({ session: session.session, onClick: onSessionClick }), + sort: session => session.session.start.seconds, + alwaysShow: true, + }, + { + name: "sessionDuration", + title: statisticsTableTitles.sessionDuration(statType), + className: cx("cl-table__col-session"), + cell: session => TimestampToMoment(session.session.start).fromNow(true), sort: session => TimestampToMoment(session.session.start).valueOf(), }, { - name: "txnAge", - title: SessionTableTitle.txnAge, + name: "status", + title: statisticsTableTitles.status(statType), className: cx("cl-table__col-session"), - cell: function(session: SessionInfo) { - if (session.session.active_txn) { - return AgeLabel({ - start: TimestampToMoment(session.session.active_txn.start), - thingName: "Transaction", - }); - } - return "N/A"; - }, - sort: session => session.session.active_txn?.start.seconds || 0, + cell: session => SessionStatus(session), + sort: session => session.session.active_queries.length, + }, + { + name: "mostRecentStatement", + title: statisticsTableTitles.mostRecentStatement(statType), + className: cx("cl-table__col-query-text"), + cell: session => StatementTableCell(session), + sort: session => session.session.last_active_query, }, { - name: "statementAge", - title: SessionTableTitle.statementAge, + name: "statementStartTime", + title: statisticsTableTitles.statementStartTime(statType), className: cx("cl-table__col-session"), - cell: function(session: SessionInfo) { - if (session.session.active_queries?.length > 0) { - return AgeLabel({ - start: TimestampToMoment(session.session.active_queries[0].start), - thingName: "Statement", - }); - } - return "N/A"; - }, - sort: function(session: SessionInfo): number { - if (session.session.active_queries?.length > 0) { - return session.session.active_queries[0].start.seconds.toNumber(); - } - return 0; - }, + cell: session => formatStatementStart(session.session), + sort: session => + session.session.active_queries.length > 0 + ? session.session.active_queries[0].start.seconds + : 0, }, { name: "memUsage", - title: SessionTableTitle.memUsage, + title: statisticsTableTitles.memUsage(statType), className: cx("cl-table__col-session"), cell: session => BytesWithPrecision(session.session.alloc_bytes?.toNumber(), 0) + @@ -175,14 +199,29 @@ export function makeSessionsColumns( sort: session => session.session.alloc_bytes?.toNumber(), }, { - name: "statement", - title: SessionTableTitle.statement, - className: cx("cl-table__col-session", "code"), - cell: session => StatementTableCell({ session: session.session }), + name: "clientAddress", + title: statisticsTableTitles.clientAddress(statType), + className: cx("cl-table__col-session"), + cell: session => session.session.client_address, + sort: session => session.session.client_address, + }, + { + name: "username", + title: statisticsTableTitles.username(statType), + className: cx("cl-table__col-session"), + cell: session => session.session.username, + sort: session => session.session.username, + }, + { + name: "applicationName", + title: statisticsTableTitles.applicationName(statType), + className: cx("cl-table__col-session"), + cell: session => session.session.application_name, + sort: session => session.session.application_name, }, { name: "actions", - title: SessionTableTitle.actions, + title: statisticsTableTitles.actions(statType), className: cx("cl-table__col-session-actions"), titleAlign: "right", cell: ({ session }) => { @@ -226,7 +265,11 @@ export function makeSessionsColumns( const renderDropdownToggleButton: JSX.Element = ( <> ); @@ -241,6 +284,7 @@ export function makeSessionsColumns( /> ); }, + alwaysShow: true, }, ]; diff --git a/pkg/ui/workspaces/cluster-ui/src/sessions/sessionsTableContent.tsx b/pkg/ui/workspaces/cluster-ui/src/sessions/sessionsTableContent.tsx deleted file mode 100644 index 1f52b02ba34f..000000000000 --- a/pkg/ui/workspaces/cluster-ui/src/sessions/sessionsTableContent.tsx +++ /dev/null @@ -1,115 +0,0 @@ -// Copyright 2020 The Cockroach Authors. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.txt. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0, included in the file -// licenses/APL.txt. - -import React from "react"; -import { Tooltip } from "@cockroachlabs/ui-components"; - -export const SessionTableTitle = { - id: ( - - Session ID - - ), - statement: ( - - Statement - - ), - actions: ( - - Actions - - ), - sessionAge: ( - - Session Duration - - ), - txnAge: ( - - Transaction Duration - - ), - statementAge: ( - - Statement Duration - - ), - memUsage: ( - - Memory Usage - - ), - maxMemUsed: ( - - Maximum Memory Usage - - ), - numRetries: ( - - Retries - - ), - lastActiveStatement: ( - - Last Statement - - ), - numStmts: ( - - Statements Run - - ), -}; diff --git a/pkg/ui/workspaces/cluster-ui/src/statsTableUtil/statsTableUtil.tsx b/pkg/ui/workspaces/cluster-ui/src/statsTableUtil/statsTableUtil.tsx index 335563d019b3..7553fe1348fa 100644 --- a/pkg/ui/workspaces/cluster-ui/src/statsTableUtil/statsTableUtil.tsx +++ b/pkg/ui/workspaces/cluster-ui/src/statsTableUtil/statsTableUtil.tsx @@ -30,6 +30,20 @@ export type NodeNames = { [nodeId: string]: string }; // Single place for column names. Used in table columns and in columns selector. export const statisticsColumnLabels = { + sessionStart: "Session Start Time (UTC)", + sessionDuration: "Session Duration", + mostRecentStatement: "Most Recent Statement", + status: "Status", + statementStartTime: "Statement Start Time (UTC)", + txnDuration: "Transaction Duration", + actions: "Actions", + memUsage: "Memory Usage", + maxMemUsed: "Maximum Memory Usage", + numRetries: "Retries", + numStatements: "Statements Run", + clientAddress: "Client IP Address", + username: "User Name", + applicationName: "Application Name", bytesRead: "Bytes Read", contention: "Contention", database: "Database", @@ -58,7 +72,11 @@ export const contentModifiers = { statements: "statements", }; -export type StatisticType = "statement" | "transaction" | "transactionDetails"; +export type StatisticType = + | "statement" + | "session" + | "transaction" + | "transactionDetails"; export type StatisticTableColumnKeys = keyof typeof statisticsColumnLabels; type StatisticTableTitleType = { @@ -95,6 +113,170 @@ export function getLabel( // of data the statistics are based on (e.g. statements, transactions, or transactionDetails). The // StatisticType is used to modify the content of the tooltip. export const statisticsTableTitles: StatisticTableTitleType = { + sessionStart: () => { + return ( + + {getLabel("sessionStart")} + + ); + }, + sessionDuration: () => { + return ( + + {getLabel("sessionDuration")} + + ); + }, + status: () => { + return ( + + {getLabel("status")} + + ); + }, + mostRecentStatement: () => { + return ( + + {getLabel("mostRecentStatement")} + + ); + }, + statementStartTime: () => { + return ( + + {getLabel("statementStartTime")} + + ); + }, + memUsage: () => { + return ( + + {getLabel("memUsage")} + + ); + }, + clientAddress: () => { + return ( + + {getLabel("clientAddress")} + + ); + }, + username: () => { + return ( + + {getLabel("username")} + + ); + }, + applicationName: () => { + return ( + + {getLabel("applicationName")} + + ); + }, + actions: () => { + return ( + + {getLabel("actions")} + + ); + }, + maxMemUsed: () => { + return ( + + {getLabel("maxMemUsage")} + + ); + }, + numRetries: () => { + return ( + + {getLabel("retries")} + + ); + }, + numStatements: () => { + return ( + + {getLabel("numStatements")} + + ); + }, + txnDuration: () => { + return ( + + {getLabel("txnDuration")} + + ); + }, statements: () => { return ( ; +export const selectData = createSelector( + (state: AdminUIState) => state.cachedData.statements, + (state: CachedDataReducerState) => { + if (!state.data || state.inFlight || !state.valid) return null; + return state.data; + }, +); + export const selectSessions = createSelector( (state: SessionsState) => state.cachedData.sessions, (_state: SessionsState, props: RouteComponentProps) => props, @@ -42,15 +57,44 @@ export const selectSessions = createSelector( }, ); +export const selectAppName = createSelector( + (state: SessionsState) => state.cachedData.sessions, + (_state: SessionsState, props: RouteComponentProps) => props, + ( + state: CachedDataReducerState, + _: RouteComponentProps, + ) => { + if (!state.data) { + return null; + } + return state.data.internal_app_name_prefix; + }, +); + export const sortSettingLocalSetting = new LocalSetting( "sortSetting/SessionsPage", (state: AdminUIState) => state.localSettings, { ascending: false, columnTitle: "statementAge" }, ); +export const sessionColumnsLocalSetting = new LocalSetting( + "showColumns/SessionsPage", + (state: AdminUIState) => state.localSettings, + null, +); + +export const filtersLocalSetting = new LocalSetting( + "filters/SessionsPage", + (state: AdminUIState) => state.localSettings, + defaultFilters, +); + const SessionsPageConnected = withRouter( connect( (state: AdminUIState, props: RouteComponentProps) => ({ + columns: sessionColumnsLocalSetting.selectorToArray(state), + internalAppNamePrefix: selectAppName(state, props), + filters: filtersLocalSetting.selector(state), sessions: selectSessions(state, props), sessionsError: state.cachedData.sessions.lastError, sortSetting: sortSettingLocalSetting.selector(state), @@ -68,6 +112,11 @@ const SessionsPageConnected = withRouter( ascending: ascending, columnTitle: columnName, }), + onColumnsChange: (value: string[]) => + sessionColumnsLocalSetting.set( + value.length === 0 ? " " : value.join(","), + ), + onFilterChange: (filters: Filters) => filtersLocalSetting.set(filters), }, )(SessionsPage), );