From 3ae8a88198896b0c788c71d2264549f234feafeb Mon Sep 17 00:00:00 2001 From: Xin Hao Zhang Date: Tue, 21 Feb 2023 13:24:05 -0500 Subject: [PATCH 1/5] server: format jobs api query Reformats query string for the jobs admin endpoint. Epic: none Release note: None --- pkg/server/admin.go | 37 +++++++++++++++++++++++++++---------- 1 file changed, 27 insertions(+), 10 deletions(-) diff --git a/pkg/server/admin.go b/pkg/server/admin.go index 7d2370abf3b3..b581eb160db6 100644 --- a/pkg/server/admin.go +++ b/pkg/server/admin.go @@ -2250,16 +2250,33 @@ func jobsHelper( q := makeSQLQuery() q.Append(` - SELECT job_id, job_type, description, statement, user_name, descriptor_ids, - case - when ` + retryRunningCondition + ` then 'retry-running' - when ` + retryRevertingCondition + ` then 'retry-reverting' - else status - end as status, running_status, created, started, finished, modified, fraction_completed, - high_water_timestamp, error, last_run, next_run, num_runs, execution_events::string, coordinator_id - FROM crdb_internal.jobs - WHERE true - `) +SELECT + job_id, + job_type, + description, + statement, + user_name, + descriptor_ids, + case + when ` + retryRunningCondition + ` then 'retry-running' + when ` + retryRevertingCondition + ` then 'retry-reverting' + else status + end as status, + running_status, + created, + started, + finished, + modified, + fraction_completed, + high_water_timestamp, + error, + last_run, + next_run, + num_runs, + execution_events::string, + coordinator_id +FROM crdb_internal.jobs +WHERE true`) // Simplifies filter construction below. if req.Status == "retrying" { q.Append(" AND ( ( " + retryRunningCondition + " ) OR ( " + retryRevertingCondition + " ) )") } else if req.Status != "" { From 2eca52164869c46ed0f438926b3a1ad8ef90e265 Mon Sep 17 00:00:00 2001 From: Xin Hao Zhang Date: Tue, 21 Feb 2023 13:42:46 -0500 Subject: [PATCH 2/5] server, ui: remove interpreted jobs retrying status This commit removes the 'Retrying' status from the jobs UX. Previously, we were interpolating this status from the running status. This just added confusion and incorectness to the status of the job being displayed. The status being surfaced now aligns directly with what is shown in the `crdb_internal.jobs` table. Some missing job statuses were also added as request options to the 'Status' dropdown, including: - Pause Requested - Cancel Requested - Revert Failed Fixes: #95712 Release note (ui change): Retrying is no longer a status shown in the jobs page. --- pkg/server/admin.go | 24 +++----- pkg/server/admin_test.go | 36 ------------ .../cluster-ui/src/jobs/jobsPage/jobsPage.tsx | 38 +++++++----- .../cluster-ui/src/jobs/util/jobOptions.tsx | 58 +++++++++---------- .../cluster-ui/src/jobs/util/jobStatus.tsx | 4 +- .../src/jobs/util/jobStatusCell.tsx | 40 +++---------- .../cluster-ui/src/jobs/util/progressBar.tsx | 5 +- 7 files changed, 71 insertions(+), 134 deletions(-) diff --git a/pkg/server/admin.go b/pkg/server/admin.go index b581eb160db6..a0ebe8f15e45 100644 --- a/pkg/server/admin.go +++ b/pkg/server/admin.go @@ -2245,8 +2245,6 @@ func jobsHelper( cfg *BaseConfig, sv *settings.Values, ) (_ *serverpb.JobsResponse, retErr error) { - retryRunningCondition := "status='running' AND next_run > now() AND num_runs > 1" - retryRevertingCondition := "status='reverting' AND next_run > now() AND num_runs > 1" q := makeSQLQuery() q.Append(` @@ -2257,11 +2255,7 @@ SELECT statement, user_name, descriptor_ids, - case - when ` + retryRunningCondition + ` then 'retry-running' - when ` + retryRevertingCondition + ` then 'retry-reverting' - else status - end as status, + status, running_status, created, started, @@ -2277,25 +2271,23 @@ SELECT coordinator_id FROM crdb_internal.jobs WHERE true`) // Simplifies filter construction below. - if req.Status == "retrying" { - q.Append(" AND ( ( " + retryRunningCondition + " ) OR ( " + retryRevertingCondition + " ) )") - } else if req.Status != "" { + if req.Status != "" { q.Append(" AND status = $", req.Status) } if req.Type != jobspb.TypeUnspecified { q.Append(" AND job_type = $", req.Type.String()) } else { // Don't show automatic jobs in the overview page. - q.Append(" AND (") + q.Append(" AND ( job_type NOT IN (") for idx, jobType := range jobspb.AutomaticJobTypes { - q.Append("job_type != $", jobType.String()) - if idx < len(jobspb.AutomaticJobTypes)-1 { - q.Append(" AND ") + if idx != 0 { + q.Append(", ") } + q.Append("$", jobType.String()) } - q.Append(" OR job_type IS NULL)") + q.Append(" ) OR job_type IS NULL)") } - q.Append("ORDER BY created DESC") + q.Append(" ORDER BY created DESC") if req.Limit > 0 { q.Append(" LIMIT $", tree.DInt(req.Limit)) } diff --git a/pkg/server/admin_test.go b/pkg/server/admin_test.go index b3a2b02e5505..e5f2680ef65a 100644 --- a/pkg/server/admin_test.go +++ b/pkg/server/admin_test.go @@ -1725,11 +1725,6 @@ func TestAdminAPIJobs(t *testing.T) { append(append([]int64{}, revertingOnlyIds...), retryRevertingIds...), []int64{}, }, - { - "jobs?status=retrying", - append(append([]int64{}, retryRunningIds...), retryRevertingIds...), - []int64{}, - }, { "jobs?status=pending", []int64{}, @@ -1807,11 +1802,6 @@ func TestAdminAPIJobsDetails(t *testing.T) { defer s.Stopper().Stop(context.Background()) sqlDB := sqlutils.MakeSQLRunner(conn) - runningOnlyIds := []int64{1, 3, 5} - revertingOnlyIds := []int64{2, 4, 6} - retryRunningIds := []int64{7} - retryRevertingIds := []int64{8} - now := timeutil.Now() encodedError := func(err error) *errors.EncodedError { @@ -1891,32 +1881,6 @@ func TestAdminAPIJobsDetails(t *testing.T) { t.Fatal(err) } - // test that the select statement correctly converts expected jobs to retry-____ statuses - expectedStatuses := []struct { - status string - ids []int64 - }{ - {"running", runningOnlyIds}, - {"reverting", revertingOnlyIds}, - {"retry-running", retryRunningIds}, - {"retry-reverting", retryRevertingIds}, - } - for _, expected := range expectedStatuses { - var jobsWithStatus []serverpb.JobResponse - for _, job := range res.Jobs { - for _, expectedID := range expected.ids { - if job.ID == expectedID { - jobsWithStatus = append(jobsWithStatus, job) - } - } - } - - require.Len(t, jobsWithStatus, len(expected.ids)) - for _, job := range jobsWithStatus { - assert.Equal(t, expected.status, job.Status) - } - } - // Trim down our result set to the jobs we injected. resJobs := append([]serverpb.JobResponse(nil), res.Jobs...) sort.Slice(resJobs, func(i, j int) bool { diff --git a/pkg/ui/workspaces/cluster-ui/src/jobs/jobsPage/jobsPage.tsx b/pkg/ui/workspaces/cluster-ui/src/jobs/jobsPage/jobsPage.tsx index 0b2c4528032e..01e26069e646 100644 --- a/pkg/ui/workspaces/cluster-ui/src/jobs/jobsPage/jobsPage.tsx +++ b/pkg/ui/workspaces/cluster-ui/src/jobs/jobsPage/jobsPage.tsx @@ -26,7 +26,14 @@ import { Pagination, ResultsPerPageLabel } from "src/pagination"; import { isSelectedColumn } from "src/columnsSelector/utils"; import { DATE_FORMAT_24_UTC, syncHistory, TimestampToMoment } from "src/util"; import { jobsColumnLabels, JobsTable, makeJobsColumns } from "./jobsTable"; -import { showOptions, statusOptions, typeOptions } from "../util"; +import { + showOptions, + statusOptions, + typeOptions, + isValidJobStatus, + defaultRequestOptions, + isValidJobType, +} from "../util"; import { commonStyles } from "src/common"; import sortableTableStyles from "src/sortedtable/sortedtable.module.scss"; @@ -108,8 +115,8 @@ export class JobsPage extends React.Component { } // Filter Status. - const status = searchParams.get("status") || undefined; - if (this.props.setStatus && status && status != this.props.status) { + const status = searchParams.get("status"); + if (this.props.setStatus && status && status !== this.props.status) { this.props.setStatus(status); } @@ -145,6 +152,17 @@ export class JobsPage extends React.Component { } componentDidUpdate(prevProps: JobsPageProps): void { + // Because we removed the retrying status, we add this check + // just in case there exists an app that attempts to load a non-existent + // status. + if (!isValidJobStatus(this.props.status)) { + this.onStatusSelected(defaultRequestOptions.status); + } + + if (!isValidJobType(this.props.type)) { + this.onTypeSelected(defaultRequestOptions.type.toString()); + } + if ( prevProps.lastUpdated !== this.props.lastUpdated || prevProps.show !== this.props.show || @@ -274,27 +292,21 @@ export class JobsPage extends React.Component { Status:{" "} - { - statusOptions.find(option => option["value"] === status)[ - "name" - ] - } + {statusOptions.find(option => option.value === status)?.name} Type:{" "} { - typeOptions.find( - option => option["value"] === type.toString(), - )["name"] + typeOptions.find(option => option.value === type.toString()) + ?.name } - Show:{" "} - {showOptions.find(option => option["value"] === show)["name"]} + Show: {showOptions.find(option => option.value === show)?.name} diff --git a/pkg/ui/workspaces/cluster-ui/src/jobs/util/jobOptions.tsx b/pkg/ui/workspaces/cluster-ui/src/jobs/util/jobOptions.tsx index f51fafceed29..04541fd36f8f 100644 --- a/pkg/ui/workspaces/cluster-ui/src/jobs/util/jobOptions.tsx +++ b/pkg/ui/workspaces/cluster-ui/src/jobs/util/jobOptions.tsx @@ -33,12 +33,8 @@ export function jobToVisual(job: Job): JobStatusVisual { return JobStatusVisual.BadgeWithErrorMessage; case JOB_STATUS_RUNNING: return JobStatusVisual.ProgressBarWithDuration; - case JOB_STATUS_RETRY_RUNNING: - return JobStatusVisual.ProgressBarWithDuration; case JOB_STATUS_PENDING: return JobStatusVisual.BadgeWithMessage; - case JOB_STATUS_RETRY_REVERTING: - return JobStatusVisual.BadgeWithRetrying; case JOB_STATUS_CANCELED: case JOB_STATUS_CANCEL_REQUESTED: case JOB_STATUS_PAUSED: @@ -59,19 +55,14 @@ export const JOB_STATUS_CANCEL_REQUESTED = "cancel-requested"; export const JOB_STATUS_PAUSED = "paused"; export const JOB_STATUS_PAUSE_REQUESTED = "paused-requested"; export const JOB_STATUS_RUNNING = "running"; -export const JOB_STATUS_RETRY_RUNNING = "retry-running"; export const JOB_STATUS_PENDING = "pending"; export const JOB_STATUS_REVERTING = "reverting"; export const JOB_STATUS_REVERT_FAILED = "revert-failed"; -export const JOB_STATUS_RETRY_REVERTING = "retry-reverting"; -export function isRetrying(status: string): boolean { - return [JOB_STATUS_RETRY_RUNNING, JOB_STATUS_RETRY_REVERTING].includes( - status, - ); -} export function isRunning(status: string): boolean { - return [JOB_STATUS_RUNNING, JOB_STATUS_RETRY_RUNNING].includes(status); + return [JOB_STATUS_RUNNING, JOB_STATUS_REVERTING].some(s => + status.includes(s), + ); } export function isTerminalState(status: string): boolean { return [JOB_STATUS_SUCCEEDED, JOB_STATUS_FAILED].includes(status); @@ -79,16 +70,28 @@ export function isTerminalState(status: string): boolean { export const statusOptions = [ { value: "", name: "All" }, - { value: "succeeded", name: "Succeeded" }, - { value: "failed", name: "Failed" }, - { value: "paused", name: "Paused" }, - { value: "canceled", name: "Canceled" }, - { value: "running", name: "Running" }, - { value: "pending", name: "Pending" }, - { value: "reverting", name: "Reverting" }, - { value: "retrying", name: "Retrying" }, + { value: JOB_STATUS_SUCCEEDED, name: "Succeeded" }, + { value: JOB_STATUS_FAILED, name: "Failed" }, + { value: JOB_STATUS_PAUSED, name: "Paused" }, + { value: JOB_STATUS_PAUSE_REQUESTED, name: "Pause Requested" }, + { value: JOB_STATUS_CANCELED, name: "Canceled" }, + { value: JOB_STATUS_CANCEL_REQUESTED, name: "Cancel Requested" }, + { value: JOB_STATUS_RUNNING, name: "Running" }, + { value: JOB_STATUS_PENDING, name: "Pending" }, + { value: JOB_STATUS_REVERTING, name: "Reverting" }, + { value: JOB_STATUS_REVERT_FAILED, name: "Revert Failed" }, ]; +const ALL_JOB_STATUSES = new Set(statusOptions.map(option => option.value)); + +/** + * @param jobStatus job status - any string + * @returns Returns true if the job status string is a valid status. + */ +export function isValidJobStatus(jobStatus: string): boolean { + return ALL_JOB_STATUSES.has(jobStatus); +} + export function jobHasOneOfStatuses(job: Job, ...statuses: string[]): boolean { return statuses.indexOf(job.status) !== -1; } @@ -110,21 +113,10 @@ export const jobStatusToBadgeStatus = (status: string): BadgeStatus => { case JOB_STATUS_PAUSED: case JOB_STATUS_PAUSE_REQUESTED: case JOB_STATUS_REVERTING: - case JOB_STATUS_RETRY_REVERTING: default: return "default"; } }; -export const jobStatusToBadgeText = (status: string): string => { - switch (status) { - case JOB_STATUS_RETRY_REVERTING: - return JOB_STATUS_REVERTING; - case JOB_STATUS_RETRY_RUNNING: - return JOB_STATUS_RUNNING; - default: - return status; - } -}; const jobTypeKeys = Object.keys(JobType); @@ -216,6 +208,10 @@ export const typeOptions = [ }, ]; +export function isValidJobType(jobType: number): boolean { + return jobType >= 0 && jobType < jobTypeKeys.length; +} + export const showOptions = [ { value: "50", name: "Latest 50" }, { value: "0", name: "All" }, diff --git a/pkg/ui/workspaces/cluster-ui/src/jobs/util/jobStatus.tsx b/pkg/ui/workspaces/cluster-ui/src/jobs/util/jobStatus.tsx index efdfb379710d..30c25a96a333 100644 --- a/pkg/ui/workspaces/cluster-ui/src/jobs/util/jobStatus.tsx +++ b/pkg/ui/workspaces/cluster-ui/src/jobs/util/jobStatus.tsx @@ -13,7 +13,7 @@ import classNames from "classnames/bind"; import React from "react"; import { Duration } from "./duration"; -import { JobStatusVisual, isRetrying, jobToVisual } from "./jobOptions"; +import { JobStatusVisual, jobToVisual } from "./jobOptions"; import { JobStatusBadge, ProgressBar, @@ -54,7 +54,6 @@ export const JobStatus: React.FC = ({ ); case JobStatusVisual.ProgressBarWithDuration: { - const jobIsRetrying = isRetrying(job.status); return (
= ({ showPercentage={true} /> - {jobIsRetrying && } {job.running_status && (
{job.running_status} diff --git a/pkg/ui/workspaces/cluster-ui/src/jobs/util/jobStatusCell.tsx b/pkg/ui/workspaces/cluster-ui/src/jobs/util/jobStatusCell.tsx index 6eef6bec96db..5cb9deac075b 100644 --- a/pkg/ui/workspaces/cluster-ui/src/jobs/util/jobStatusCell.tsx +++ b/pkg/ui/workspaces/cluster-ui/src/jobs/util/jobStatusCell.tsx @@ -8,13 +8,9 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. import { cockroach } from "@cockroachlabs/crdb-protobuf-client"; -import { Tooltip } from "@cockroachlabs/ui-components"; import React from "react"; -import { TimestampToMoment } from "src/util"; -import { DATE_FORMAT_24_UTC } from "src/util/format"; import { JobStatus } from "./jobStatus"; -import { isRetrying } from "./jobOptions"; type Job = cockroach.server.serverpb.IJobResponse; @@ -30,31 +26,11 @@ export const JobStatusCell: React.FC = ({ lineWidth, compact = false, hideDuration = false, -}) => { - const jobStatus = ( - - ); - if (isRetrying(job.status)) { - return ( - - Next Planned Execution Time: -
- {TimestampToMoment(job.next_run).format(DATE_FORMAT_24_UTC)} - - } - > - {jobStatus} -
- ); - } - return jobStatus; -}; +}) => ( + +); diff --git a/pkg/ui/workspaces/cluster-ui/src/jobs/util/progressBar.tsx b/pkg/ui/workspaces/cluster-ui/src/jobs/util/progressBar.tsx index a8d057fbb7c1..56a320d24fdf 100644 --- a/pkg/ui/workspaces/cluster-ui/src/jobs/util/progressBar.tsx +++ b/pkg/ui/workspaces/cluster-ui/src/jobs/util/progressBar.tsx @@ -12,7 +12,7 @@ import { Line } from "rc-progress"; import React from "react"; import { Badge } from "src/badge"; -import { jobStatusToBadgeStatus, jobStatusToBadgeText } from "./jobOptions"; +import { jobStatusToBadgeStatus } from "./jobOptions"; import styles from "../jobs.module.scss"; import classNames from "classnames/bind"; @@ -25,8 +25,7 @@ export class JobStatusBadge extends React.PureComponent<{ jobStatus: string }> { render(): React.ReactElement { const jobStatus = this.props.jobStatus; const badgeStatus = jobStatusToBadgeStatus(jobStatus); - const badgeText = jobStatusToBadgeText(jobStatus); - return ; + return ; } } From 62d3f2e93a425e9bd49a439d97c51c0d5588eac3 Mon Sep 17 00:00:00 2001 From: Michael Butler Date: Sun, 19 Feb 2023 15:57:14 -0500 Subject: [PATCH 3/5] c2c: gather perf metrics from prometheus c2c roachtest performance metrics are now gathered by a prom/grafana instance running locally on the roachprod cluster. This change allows us to gather and process any metrics exposed to the crdb prom endpoint. Specifically, we now gather: `capacity_used`, `replication_logical_bytes`, `replication_sst_bytes` at various points during the c2c roachtest, allowing us to measure: - Initial Scan Throughput: initial scan size / initial scan duration - Workload Throughput: data ingested during workload / workload duration - Cutover Throughput: (data ingested between cutover time and cutover cmd) / (cutover process duration) where the size of these operations can be measured as either physical replicated bytes, logical ingested bytes, or physical ingested bytes on the source cluster. This patch also fixes a recent bug which mislabeled src cluster throughput as initial scan throughput. Epic: None --- pkg/cmd/roachtest/tests/cluster_to_cluster.go | 256 +++++++++++------- 1 file changed, 161 insertions(+), 95 deletions(-) diff --git a/pkg/cmd/roachtest/tests/cluster_to_cluster.go b/pkg/cmd/roachtest/tests/cluster_to_cluster.go index dcf348b5dac6..52392d16ab9f 100644 --- a/pkg/cmd/roachtest/tests/cluster_to_cluster.go +++ b/pkg/cmd/roachtest/tests/cluster_to_cluster.go @@ -17,9 +17,11 @@ import ( "net/url" "os" "path/filepath" + "sort" "time" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/clusterstats" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec" @@ -30,6 +32,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/roachprod/install" "github.com/cockroachdb/cockroach/pkg/roachprod/logger" + "github.com/cockroachdb/cockroach/pkg/roachprod/prometheus" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -67,11 +70,68 @@ type c2cSetup struct { metrics c2cMetrics } +var c2cPromMetrics = map[string]clusterstats.ClusterStat{ + "LogicalMegabytes": { + LabelName: "node", + Query: "replication_logical_bytes / 1e6"}, + "PhysicalMegabytes": { + LabelName: "node", + Query: "replication_sst_bytes / 1e6"}, + "PhysicalReplicatedMegabytes": { + LabelName: "node", + Query: "capacity_used / 1e6"}, +} + +func sumOverLabel(stats map[string]map[string]clusterstats.StatPoint, label string) float64 { + var mean float64 + for _, stat := range stats[label] { + mean += stat.Value + } + return mean +} + +func (cc *c2cSetup) startStatsCollection( + ctx context.Context, t test.Test, c cluster.Cluster, +) func(time.Time) map[string]float64 { + + if c.IsLocal() { + // Grafana does not run locally. + return func(snapTime time.Time) map[string]float64 { + return map[string]float64{} + } + } + // TODO(msbutler): pass a proper cluster replication dashboard and figure out why we need to + // pass a grafana dashboard for this to work + cfg := (&prometheus.Config{}). + WithPrometheusNode(cc.workloadNode.InstallNodes()[0]). + WithCluster(cc.dst.nodes.InstallNodes()). + WithNodeExporter(cc.dst.nodes.InstallNodes()). + WithGrafanaDashboard("https://go.crdb.dev/p/changefeed-roachtest-grafana-dashboard") + + require.NoError(t, c.StartGrafana(ctx, t.L(), cfg)) + t.L().Printf("Prom has started") + + client, err := clusterstats.SetupCollectorPromClient(ctx, c, t.L(), cfg) + require.NoError(t, err, "error creating prometheus client for stats collector") + collector := clusterstats.NewStatsCollector(ctx, client) + + return func(snapTime time.Time) map[string]float64 { + metricSnap := make(map[string]float64) + for name, stat := range c2cPromMetrics { + point, err := collector.CollectPoint(ctx, t.L(), snapTime, stat.Query) + if err != nil { + t.L().Errorf("Could not query prom %s", err.Error()) + } + metricSnap[name] = sumOverLabel(point, stat.LabelName) + t.L().Printf("%s: %.2f", name, metricSnap[name]) + } + return metricSnap + } +} + // DiskUsageTracker can grab the disk usage of the provided cluster. // -// TODO(msbutler): move DiskUsageTracker, exportedMetric, -// SizeTime and helper methods to an external package that all -// roachtests can use. +// TODO(msbutler): deprecate this, once restore roachtests also use prom setup. type DiskUsageTracker struct { c cluster.Cluster l *logger.Logger @@ -101,88 +161,94 @@ func NewDiskUsageTracker( return &DiskUsageTracker{c: c, l: diskLogger}, nil } -// exportedMetric describes a measurement created in the roachtest process that will export to -// roachperf or a prom/grafana instance. -// -// TODO(msbutler): currently, the exported metrics are merely printed at end of -// the roachtest. Refactor these methods to play nice with a roachtest prom endpoint, -// once it exists. -type exportedMetric struct { - metric float64 - unit string -} - -// newMetric creates a new exportedMetric -func newMetric(metric float64, unit string) exportedMetric { - return exportedMetric{metric, unit} +type metricSnapshot struct { + metrics map[string]float64 + time time.Time } -func (em exportedMetric) StringWithUnits() string { - return fmt.Sprintf("%.2f %s", em.metric, em.unit) -} - -func (em exportedMetric) String() string { - return fmt.Sprintf("%.2f", em.metric) -} - -// sizeTime captures the disk size of the nodes at some moment in time -type sizeTime struct { - // size is the megabytes of the objects - size int - time time.Time - nodeCount int -} - -func newSizeTime(ctx context.Context, du *DiskUsageTracker, nodes option.NodeListOption) sizeTime { - return sizeTime{ - size: du.GetDiskUsage(ctx, nodes), - time: timeutil.Now(), - nodeCount: len(nodes), +func newMetricSnapshot( + metricSnapper func(time.Time) map[string]float64, ts time.Time, +) metricSnapshot { + snap := metricSnapshot{ + time: ts, + metrics: metricSnapper(ts), } + return snap } -// diskDiffThroughput estimates throughput between two time intervals as mb/s/node by assuming -// that the total bytes written between the time intervals is diskUsage_End - diskUsage_Start. -func diskDiffThroughput(start sizeTime, end sizeTime) float64 { - if start.nodeCount != end.nodeCount { - panic("node count cannot change while measuring throughput") - } - return (float64(end.size-start.size) / end.time.Sub(start.time).Seconds()) / float64(start.nodeCount) +// calcThroughput estimates throughput between two time intervals as metric_unit/s/node +// for the provided metric, assuming the cluster had the same number of nodes +// over the interval. +func calcThroughput( + startMetric float64, endMetric float64, interval time.Duration, nodeCount int, +) float64 { + return (endMetric - startMetric) / (interval.Seconds() * float64(nodeCount)) } type c2cMetrics struct { - start sizeTime + initalScanStart metricSnapshot + + initialScanEnd metricSnapshot - initialScanEnd sizeTime + // cutoverTo records stats at the system time to which the dst cluster cuts over to. + cutoverTo metricSnapshot - cutoverStart sizeTime + cutoverStart metricSnapshot - cutoverEnd sizeTime + cutoverEnd metricSnapshot fingerprintingStart time.Time fingerprintingEnd time.Time } -func (m c2cMetrics) export() map[string]exportedMetric { - metrics := map[string]exportedMetric{} +// export summarizes all metrics gathered throughout the test. +func (m c2cMetrics) export(t test.Test, nodeCount int) { - populate := func(start sizeTime, end sizeTime, label string) { - metrics[label+"Duration"] = newMetric(end.time.Sub(start.time).Minutes(), "Minutes") + // aggregate aggregates metric snapshots across two time periods. A non-zero + // durationOverride will be used instead of the duration between the two + // passed in snapshots. + aggregate := func( + start metricSnapshot, + end metricSnapshot, + label string, + durationOverride time.Duration) { + if start.metrics == nil || end.metrics == nil { + return + } - // Describes the cluster size difference between two timestamps. - metrics[label+"Size"] = newMetric(float64(end.size-start.size), "MB") - metrics[label+"Throughput"] = newMetric(diskDiffThroughput(start, end), "MB/S/Node") + metrics := map[string]float64{} + duration := durationOverride + if duration == 0 { + duration = end.time.Sub(start.time) + } + metrics["Duration Minutes"] = duration.Minutes() - } - if m.initialScanEnd.nodeCount != 0 { - populate(m.start, m.initialScanEnd, "InitialScan") - } + for metricName := range start.metrics { + metrics["Size_"+metricName] = end.metrics[metricName] - start.metrics[metricName] + metrics["Throughput_"+metricName+"_MB/S/Node"] = calcThroughput( + start.metrics[metricName], end.metrics[metricName], duration, nodeCount) + } - if m.cutoverEnd.nodeCount != 0 { - populate(m.cutoverStart, m.cutoverEnd, "Cutover") + // Print all the metrics for now while we wait for prom/grafana to visualize perf over time. + // Sort the metrics for pretty printing. + metricNames := make([]string, 0, len(metrics)) + for name := range metrics { + metricNames = append(metricNames, name) + } + sort.Strings(metricNames) + t.L().Printf("%s Perf:", label) + for _, name := range metricNames { + t.L().Printf("\t%s : %.2f", name, metrics[name]) + } } - return metrics + aggregate(m.initalScanStart, m.initialScanEnd, "InitialScan", 0) + + aggregate(m.initialScanEnd, m.cutoverStart, "Workload", 0) + + // The _amount_ of data processed during cutover should be the data ingested between the + // timestamp we cut over to and the start of the cutover process. + aggregate(m.cutoverTo, m.cutoverStart, "Cutover", m.cutoverEnd.time.Sub(m.cutoverStart.time)) } func setupC2C( @@ -195,12 +261,14 @@ func setupC2C( workloadNode := c.Node(srcKVNodes + dstKVNodes + 1) c.Put(ctx, t.DeprecatedWorkload(), "./workload", workloadNode) - srcStartOps := option.DefaultStartOpts() + // TODO(msbutler): allow for backups once this test stabilizes a bit more. + srcStartOps := option.DefaultStartOptsNoBackups() srcStartOps.RoachprodOpts.InitTarget = 1 srcClusterSetting := install.MakeClusterSettings(install.SecureOption(true)) c.Start(ctx, t.L(), srcStartOps, srcClusterSetting, srcCluster) - dstStartOps := option.DefaultStartOpts() + // TODO(msbutler): allow for backups once this test stabilizes a bit more. + dstStartOps := option.DefaultStartOptsNoBackups() dstStartOps.RoachprodOpts.InitTarget = srcKVNodes + 1 dstClusterSetting := install.MakeClusterSettings(install.SecureOption(true)) c.Start(ctx, t.L(), dstStartOps, dstClusterSetting, dstCluster) @@ -245,11 +313,13 @@ func setupC2C( db: destDB, nodes: dstCluster} - return &c2cSetup{ + setup := &c2cSetup{ src: srcTenantInfo, dst: destTenantInfo, workloadNode: workloadNode, metrics: c2cMetrics{}} + + return setup } type streamingWorkload interface { @@ -366,23 +436,16 @@ func registerClusterToCluster(r registry.Registry) { Run: func(ctx context.Context, t test.Test, c cluster.Cluster) { setup := setupC2C(ctx, t, c, sp.srcNodes, sp.dstNodes) m := c.NewMonitor(ctx, setup.src.nodes.Merge(setup.dst.nodes)) - du, err := NewDiskUsageTracker(c, t.L()) - require.NoError(t, err) - var initDuration time.Duration + metricSnapper := setup.startStatsCollection(ctx, t, c) if initCmd := sp.workload.sourceInitCmd(setup.src.name, setup.src.nodes); initCmd != "" { t.Status("populating source cluster before replication") - setup.metrics.start = newSizeTime(ctx, du, setup.src.nodes) + initStart := timeutil.Now() c.Run(ctx, setup.workloadNode, initCmd) - setup.metrics.initialScanEnd = newSizeTime(ctx, du, setup.src.nodes) - - initDuration = setup.metrics.initialScanEnd.time.Sub(setup.metrics.start.time) - t.L().Printf("src cluster workload initialization took %s minutes", initDuration) + t.L().Printf("src cluster workload initialization took %s minutes", timeutil.Since(initStart).Minutes()) } - t.L().Printf("begin workload on src cluster") workloadCtx, workloadCancel := context.WithCancel(ctx) defer workloadCancel() - workloadDoneCh := make(chan struct{}) m.Go(func(ctx context.Context) error { err := c.RunE(workloadCtx, setup.workloadNode, @@ -399,6 +462,7 @@ func registerClusterToCluster(r registry.Registry) { }) t.Status("starting replication stream") + setup.metrics.initalScanStart = newMetricSnapshot(metricSnapper, timeutil.Now()) streamReplStmt := fmt.Sprintf("CREATE TENANT %q FROM REPLICATION OF %q ON '%s'", setup.dst.name, setup.src.name, setup.src.pgURL) setup.dst.sql.Exec(t, streamReplStmt) @@ -416,7 +480,7 @@ func registerClusterToCluster(r registry.Registry) { t.L().Printf("waiting for replication stream to finish ingesting initial scan") waitForHighWatermark(t, setup.dst.db, ingestionJobID, sp.timeout/2) - + setup.metrics.initialScanEnd = newMetricSnapshot(metricSnapper, timeutil.Now()) t.Status(fmt.Sprintf(`initial scan complete. run workload and repl. stream for another %s minutes`, sp.additionalDuration)) @@ -435,9 +499,12 @@ func registerClusterToCluster(r registry.Registry) { } t.Status(fmt.Sprintf("waiting for replication stream to cutover to %s", cutoverTime.String())) retainedTime := getReplicationRetainedTime(t, setup.dst.sql, roachpb.TenantName(setup.dst.name)) - setup.metrics.cutoverStart = newSizeTime(ctx, du, setup.dst.nodes) + setup.metrics.cutoverTo = newMetricSnapshot(metricSnapper, cutoverTime) + setup.metrics.cutoverStart = newMetricSnapshot(metricSnapper, timeutil.Now()) stopReplicationStream(t, setup.dst.sql, ingestionJobID, cutoverTime) - setup.metrics.cutoverEnd = newSizeTime(ctx, du, setup.dst.nodes) + setup.metrics.cutoverEnd = newMetricSnapshot(metricSnapper, timeutil.Now()) + + setup.metrics.export(t, len(setup.src.nodes)) t.Status("comparing fingerprints") compareTenantFingerprintsAtTimestamp( @@ -449,20 +516,6 @@ func registerClusterToCluster(r registry.Registry) { ) lv.assertValid(t) - // TODO(msbutler): export metrics to roachperf or prom/grafana - exportedMetrics := setup.metrics.export() - t.L().Printf(`Initial Scan: Duration, Size, Throughput; Cutover: Duration, Size, Throughput`) - t.L().Printf(`%s %s %s %s %s %s`, - exportedMetrics["InitialScanDuration"].String(), - exportedMetrics["InitialScanSize"].String(), - exportedMetrics["InitialScanThroughput"].String(), - exportedMetrics["CutoverDuration"].String(), - exportedMetrics["CutoverSize"].String(), - exportedMetrics["CutoverThroughput"].String(), - ) - for key, metric := range exportedMetrics { - t.L().Printf("%s: %s", key, metric.String()) - } }, }) } @@ -607,11 +660,24 @@ func stopReplicationStream( } func srcClusterSettings(t test.Test, db *sqlutils.SQLRunner) { - db.ExecMultiple(t, `SET CLUSTER SETTING kv.rangefeed.enabled = true;`) + db.ExecMultiple(t, `SET CLUSTER SETTING kv.rangefeed.enabled = true;`, + `SET CLUSTER SETTING kv.tenant_rate_limiter.burst_limit_seconds = 10000;`, + `SET CLUSTER SETTING kv.tenant_rate_limiter.rate_limit = -1000; `, + `SET CLUSTER SETTING kv.tenant_rate_limiter.read_batch_cost = 0;`, + `SET CLUSTER SETTING kv.tenant_rate_limiter.read_cost_per_mebibyte = 0;`, + `SET CLUSTER SETTING kv.tenant_rate_limiter.write_cost_per_megabyte = 0;`, + `SET CLUSTER SETTING kv.tenant_rate_limiter.write_request_cost = 0;`) } func destClusterSettings(t test.Test, db *sqlutils.SQLRunner) { - db.ExecMultiple(t, `SET CLUSTER SETTING cross_cluster_replication.enabled = true;`) + db.ExecMultiple(t, `SET CLUSTER SETTING cross_cluster_replication.enabled = true;`, + `SET CLUSTER SETTING kv.rangefeed.enabled = true;`, + `SET CLUSTER SETTING kv.tenant_rate_limiter.burst_limit_seconds = 10000;`, + `SET CLUSTER SETTING kv.tenant_rate_limiter.rate_limit = -1000; `, + `SET CLUSTER SETTING kv.tenant_rate_limiter.read_batch_cost = 0;`, + `SET CLUSTER SETTING kv.tenant_rate_limiter.read_cost_per_mebibyte = 0;`, + `SET CLUSTER SETTING kv.tenant_rate_limiter.write_cost_per_megabyte = 0;`, + `SET CLUSTER SETTING kv.tenant_rate_limiter.write_request_cost = 0;`) } func copyPGCertsAndMakeURL( From d4abde8485bb8d4e5dc07b0f07f0c85823835038 Mon Sep 17 00:00:00 2001 From: Michael Butler Date: Thu, 23 Feb 2023 10:37:53 -0500 Subject: [PATCH 4/5] c2c: refactor tenant ru limit removal in roachtests This patch streamlines how we remove ru limiting for roachtests that use tenants. For the c2c tests specifically, we know remove the limits on the dst cluster tenant as soon as the replication stream begins. Release note: None --- pkg/cmd/roachtest/tests/cluster_to_cluster.go | 59 ++++++++----------- pkg/cmd/roachtest/tests/multitenant_utils.go | 24 +++++--- 2 files changed, 42 insertions(+), 41 deletions(-) diff --git a/pkg/cmd/roachtest/tests/cluster_to_cluster.go b/pkg/cmd/roachtest/tests/cluster_to_cluster.go index 52392d16ab9f..2e2beb87fa8e 100644 --- a/pkg/cmd/roachtest/tests/cluster_to_cluster.go +++ b/pkg/cmd/roachtest/tests/cluster_to_cluster.go @@ -56,8 +56,8 @@ type clusterInfo struct { // db provides a connection to the system tenant db *gosql.DB - // sql provides a sql connection to the host cluster - sql *sqlutils.SQLRunner + // sql provides a sql connection to the system tenant + sysSQL *sqlutils.SQLRunner // nodes indicates the roachprod nodes running the cluster's nodes nodes option.NodeListOption @@ -300,18 +300,18 @@ func setupC2C( require.NoError(t, err) srcTenantInfo := clusterInfo{ - name: srcTenantName, - ID: srcTenantID, - pgURL: pgURL, - sql: srcSQL, - db: srcDB, - nodes: srcCluster} + name: srcTenantName, + ID: srcTenantID, + pgURL: pgURL, + sysSQL: srcSQL, + db: srcDB, + nodes: srcCluster} destTenantInfo := clusterInfo{ - name: destTenantName, - ID: destTenantID, - sql: destSQL, - db: destDB, - nodes: dstCluster} + name: destTenantName, + ID: destTenantID, + sysSQL: destSQL, + db: destDB, + nodes: dstCluster} setup := &c2cSetup{ src: srcTenantInfo, @@ -463,10 +463,13 @@ func registerClusterToCluster(r registry.Registry) { t.Status("starting replication stream") setup.metrics.initalScanStart = newMetricSnapshot(metricSnapper, timeutil.Now()) + + // There's no need to remove the tenant limiters for this new app tenant, as + // all replication traffic flows through the system tenant. streamReplStmt := fmt.Sprintf("CREATE TENANT %q FROM REPLICATION OF %q ON '%s'", setup.dst.name, setup.src.name, setup.src.pgURL) - setup.dst.sql.Exec(t, streamReplStmt) - ingestionJobID := getIngestionJobID(t, setup.dst.sql, setup.dst.name) + setup.dst.sysSQL.Exec(t, streamReplStmt) + ingestionJobID := getIngestionJobID(t, setup.dst.sysSQL, setup.dst.name) // TODO(ssd): The job doesn't record the initial // statement time, so we can't correctly measure the @@ -485,7 +488,7 @@ func registerClusterToCluster(r registry.Registry) { sp.additionalDuration)) var currentTime time.Time - setup.dst.sql.QueryRow(t, "SELECT clock_timestamp()").Scan(¤tTime) + setup.dst.sysSQL.QueryRow(t, "SELECT clock_timestamp()").Scan(¤tTime) cutoverTime := currentTime.Add(sp.additionalDuration - sp.cutover) t.Status("cutover time chosen: ", cutoverTime.String()) @@ -498,10 +501,10 @@ func registerClusterToCluster(r registry.Registry) { return } t.Status(fmt.Sprintf("waiting for replication stream to cutover to %s", cutoverTime.String())) - retainedTime := getReplicationRetainedTime(t, setup.dst.sql, roachpb.TenantName(setup.dst.name)) + retainedTime := getReplicationRetainedTime(t, setup.dst.sysSQL, roachpb.TenantName(setup.dst.name)) setup.metrics.cutoverTo = newMetricSnapshot(metricSnapper, cutoverTime) setup.metrics.cutoverStart = newMetricSnapshot(metricSnapper, timeutil.Now()) - stopReplicationStream(t, setup.dst.sql, ingestionJobID, cutoverTime) + stopReplicationStream(t, setup.dst.sysSQL, ingestionJobID, cutoverTime) setup.metrics.cutoverEnd = newMetricSnapshot(metricSnapper, timeutil.Now()) setup.metrics.export(t, len(setup.src.nodes)) @@ -548,14 +551,14 @@ AS OF SYSTEM TIME '%s'`, startTimeStr, aost) var srcFingerprint int64 m.Go(func(ctx context.Context) error { - setup.src.sql.QueryRow(t, fingerprintQuery, setup.src.ID).Scan(&srcFingerprint) + setup.src.sysSQL.QueryRow(t, fingerprintQuery, setup.src.ID).Scan(&srcFingerprint) return nil }) var destFingerprint int64 m.Go(func(ctx context.Context) error { // TODO(adityamaru): Measure and record fingerprinting throughput. setup.metrics.fingerprintingStart = timeutil.Now() - setup.dst.sql.QueryRow(t, fingerprintQuery, setup.dst.ID).Scan(&destFingerprint) + setup.dst.sysSQL.QueryRow(t, fingerprintQuery, setup.dst.ID).Scan(&destFingerprint) setup.metrics.fingerprintingEnd = timeutil.Now() fingerprintingDuration := setup.metrics.fingerprintingEnd.Sub(setup.metrics.fingerprintingStart).String() t.L().Printf("fingerprinting the destination tenant took %s", fingerprintingDuration) @@ -660,24 +663,12 @@ func stopReplicationStream( } func srcClusterSettings(t test.Test, db *sqlutils.SQLRunner) { - db.ExecMultiple(t, `SET CLUSTER SETTING kv.rangefeed.enabled = true;`, - `SET CLUSTER SETTING kv.tenant_rate_limiter.burst_limit_seconds = 10000;`, - `SET CLUSTER SETTING kv.tenant_rate_limiter.rate_limit = -1000; `, - `SET CLUSTER SETTING kv.tenant_rate_limiter.read_batch_cost = 0;`, - `SET CLUSTER SETTING kv.tenant_rate_limiter.read_cost_per_mebibyte = 0;`, - `SET CLUSTER SETTING kv.tenant_rate_limiter.write_cost_per_megabyte = 0;`, - `SET CLUSTER SETTING kv.tenant_rate_limiter.write_request_cost = 0;`) + db.ExecMultiple(t, `SET CLUSTER SETTING kv.rangefeed.enabled = true;`) } func destClusterSettings(t test.Test, db *sqlutils.SQLRunner) { db.ExecMultiple(t, `SET CLUSTER SETTING cross_cluster_replication.enabled = true;`, - `SET CLUSTER SETTING kv.rangefeed.enabled = true;`, - `SET CLUSTER SETTING kv.tenant_rate_limiter.burst_limit_seconds = 10000;`, - `SET CLUSTER SETTING kv.tenant_rate_limiter.rate_limit = -1000; `, - `SET CLUSTER SETTING kv.tenant_rate_limiter.read_batch_cost = 0;`, - `SET CLUSTER SETTING kv.tenant_rate_limiter.read_cost_per_mebibyte = 0;`, - `SET CLUSTER SETTING kv.tenant_rate_limiter.write_cost_per_megabyte = 0;`, - `SET CLUSTER SETTING kv.tenant_rate_limiter.write_request_cost = 0;`) + `SET CLUSTER SETTING kv.rangefeed.enabled = true;`) } func copyPGCertsAndMakeURL( diff --git a/pkg/cmd/roachtest/tests/multitenant_utils.go b/pkg/cmd/roachtest/tests/multitenant_utils.go index 35603c449a53..f1cf7a0ae4e8 100644 --- a/pkg/cmd/roachtest/tests/multitenant_utils.go +++ b/pkg/cmd/roachtest/tests/multitenant_utils.go @@ -313,6 +313,8 @@ func createInMemoryTenant( sysSQL.Exec(t, "CREATE TENANT $1", tenantName) sysSQL.Exec(t, "ALTER TENANT $1 START SERVICE SHARED", tenantName) + removeTenantRateLimiters(t, sysSQL, tenantName) + // Opening a SQL session to a newly created in-process tenant may require a // few retries. Unfortunately, the c.ConnE and MakeSQLRunner APIs do not make // it clear if they eagerly open a session with the tenant or wait until the @@ -331,14 +333,22 @@ func createInMemoryTenant( return nil }) - // Currently, a tenant has by default a 10m RU burst limit, which can be - // reached during these tests. To prevent RU limit throttling, add 10B RUs to - // the tenant. - var tenantID int - sysSQL.QueryRow(t, `SELECT id FROM [SHOW TENANT $1]`, tenantName).Scan(&tenantID) - sysSQL.Exec(t, `SELECT crdb_internal.update_tenant_resource_limits($1, 10000000000, 0, -10000000000, now(), 0);`, tenantID) if secure { createTenantAdminRole(t, tenantName, tenantSQL) } } + +// removeTenantRateLimiters ensures the tenant is not throttled by limiters. +func removeTenantRateLimiters(t test.Test, systemSQL *sqlutils.SQLRunner, tenantName string) { + var tenantID int + systemSQL.QueryRow(t, `SELECT id FROM [SHOW TENANT $1]`, tenantName).Scan(&tenantID) + systemSQL.Exec(t, `SELECT crdb_internal.update_tenant_resource_limits($1, 10000000000, 0, +10000000000, now(), 0);`, tenantID) + systemSQL.ExecMultiple(t, + `SET CLUSTER SETTING kv.tenant_rate_limiter.burst_limit_seconds = 10000;`, + `SET CLUSTER SETTING kv.tenant_rate_limiter.rate_limit = -1000; `, + `SET CLUSTER SETTING kv.tenant_rate_limiter.read_batch_cost = 0;`, + `SET CLUSTER SETTING kv.tenant_rate_limiter.read_cost_per_mebibyte = 0;`, + `SET CLUSTER SETTING kv.tenant_rate_limiter.write_cost_per_megabyte = 0;`, + `SET CLUSTER SETTING kv.tenant_rate_limiter.write_request_cost = 0;`) +} From 4610bc6ec489d2bfd26f02fa3e2cf3915d5e24a0 Mon Sep 17 00:00:00 2001 From: Michael Butler Date: Thu, 23 Feb 2023 13:39:55 -0500 Subject: [PATCH 5/5] c2c: add c2c/tpcc/warehouses=1000/duration=60/cutover=30 roachtest Epic: none Release note: none --- pkg/cmd/roachtest/tests/cluster_to_cluster.go | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/pkg/cmd/roachtest/tests/cluster_to_cluster.go b/pkg/cmd/roachtest/tests/cluster_to_cluster.go index 2e2beb87fa8e..5f20b5ad262e 100644 --- a/pkg/cmd/roachtest/tests/cluster_to_cluster.go +++ b/pkg/cmd/roachtest/tests/cluster_to_cluster.go @@ -122,6 +122,7 @@ func (cc *c2cSetup) startStatsCollection( if err != nil { t.L().Errorf("Could not query prom %s", err.Error()) } + // TODO(msbutler): update the CollectPoint api to conduct the sum in Prom instead. metricSnap[name] = sumOverLabel(point, stat.LabelName) t.L().Printf("%s: %.2f", name, metricSnap[name]) } @@ -408,6 +409,21 @@ func registerClusterToCluster(r registry.Registry) { additionalDuration: 10 * time.Minute, cutover: 5 * time.Minute, }, + { + name: "c2c/tpcc/warehouses=1000/duration=60/cutover=30", + srcNodes: 4, + dstNodes: 4, + cpus: 8, + pdSize: 1000, + // 500 warehouses adds 30 GB to source + // + // TODO(msbutler): increase default test to 1000 warehouses once fingerprinting + // job speeds up. + workload: replicateTPCC{warehouses: 1000}, + timeout: 3 * time.Hour, + additionalDuration: 60 * time.Minute, + cutover: 30 * time.Minute, + }, { name: "c2c/kv0", srcNodes: 3,