diff --git a/pkg/cmd/roachtest/tests/cluster_to_cluster.go b/pkg/cmd/roachtest/tests/cluster_to_cluster.go index dcf348b5dac6..5f20b5ad262e 100644 --- a/pkg/cmd/roachtest/tests/cluster_to_cluster.go +++ b/pkg/cmd/roachtest/tests/cluster_to_cluster.go @@ -17,9 +17,11 @@ import ( "net/url" "os" "path/filepath" + "sort" "time" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/clusterstats" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec" @@ -30,6 +32,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/roachprod/install" "github.com/cockroachdb/cockroach/pkg/roachprod/logger" + "github.com/cockroachdb/cockroach/pkg/roachprod/prometheus" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -53,8 +56,8 @@ type clusterInfo struct { // db provides a connection to the system tenant db *gosql.DB - // sql provides a sql connection to the host cluster - sql *sqlutils.SQLRunner + // sql provides a sql connection to the system tenant + sysSQL *sqlutils.SQLRunner // nodes indicates the roachprod nodes running the cluster's nodes nodes option.NodeListOption @@ -67,11 +70,69 @@ type c2cSetup struct { metrics c2cMetrics } +var c2cPromMetrics = map[string]clusterstats.ClusterStat{ + "LogicalMegabytes": { + LabelName: "node", + Query: "replication_logical_bytes / 1e6"}, + "PhysicalMegabytes": { + LabelName: "node", + Query: "replication_sst_bytes / 1e6"}, + "PhysicalReplicatedMegabytes": { + LabelName: "node", + Query: "capacity_used / 1e6"}, +} + +func sumOverLabel(stats map[string]map[string]clusterstats.StatPoint, label string) float64 { + var mean float64 + for _, stat := range stats[label] { + mean += stat.Value + } + return mean +} + +func (cc *c2cSetup) startStatsCollection( + ctx context.Context, t test.Test, c cluster.Cluster, +) func(time.Time) map[string]float64 { + + if c.IsLocal() { + // Grafana does not run locally. + return func(snapTime time.Time) map[string]float64 { + return map[string]float64{} + } + } + // TODO(msbutler): pass a proper cluster replication dashboard and figure out why we need to + // pass a grafana dashboard for this to work + cfg := (&prometheus.Config{}). + WithPrometheusNode(cc.workloadNode.InstallNodes()[0]). + WithCluster(cc.dst.nodes.InstallNodes()). + WithNodeExporter(cc.dst.nodes.InstallNodes()). + WithGrafanaDashboard("https://go.crdb.dev/p/changefeed-roachtest-grafana-dashboard") + + require.NoError(t, c.StartGrafana(ctx, t.L(), cfg)) + t.L().Printf("Prom has started") + + client, err := clusterstats.SetupCollectorPromClient(ctx, c, t.L(), cfg) + require.NoError(t, err, "error creating prometheus client for stats collector") + collector := clusterstats.NewStatsCollector(ctx, client) + + return func(snapTime time.Time) map[string]float64 { + metricSnap := make(map[string]float64) + for name, stat := range c2cPromMetrics { + point, err := collector.CollectPoint(ctx, t.L(), snapTime, stat.Query) + if err != nil { + t.L().Errorf("Could not query prom %s", err.Error()) + } + // TODO(msbutler): update the CollectPoint api to conduct the sum in Prom instead. + metricSnap[name] = sumOverLabel(point, stat.LabelName) + t.L().Printf("%s: %.2f", name, metricSnap[name]) + } + return metricSnap + } +} + // DiskUsageTracker can grab the disk usage of the provided cluster. // -// TODO(msbutler): move DiskUsageTracker, exportedMetric, -// SizeTime and helper methods to an external package that all -// roachtests can use. +// TODO(msbutler): deprecate this, once restore roachtests also use prom setup. type DiskUsageTracker struct { c cluster.Cluster l *logger.Logger @@ -101,88 +162,94 @@ func NewDiskUsageTracker( return &DiskUsageTracker{c: c, l: diskLogger}, nil } -// exportedMetric describes a measurement created in the roachtest process that will export to -// roachperf or a prom/grafana instance. -// -// TODO(msbutler): currently, the exported metrics are merely printed at end of -// the roachtest. Refactor these methods to play nice with a roachtest prom endpoint, -// once it exists. -type exportedMetric struct { - metric float64 - unit string -} - -// newMetric creates a new exportedMetric -func newMetric(metric float64, unit string) exportedMetric { - return exportedMetric{metric, unit} -} - -func (em exportedMetric) StringWithUnits() string { - return fmt.Sprintf("%.2f %s", em.metric, em.unit) -} - -func (em exportedMetric) String() string { - return fmt.Sprintf("%.2f", em.metric) +type metricSnapshot struct { + metrics map[string]float64 + time time.Time } -// sizeTime captures the disk size of the nodes at some moment in time -type sizeTime struct { - // size is the megabytes of the objects - size int - time time.Time - nodeCount int -} - -func newSizeTime(ctx context.Context, du *DiskUsageTracker, nodes option.NodeListOption) sizeTime { - return sizeTime{ - size: du.GetDiskUsage(ctx, nodes), - time: timeutil.Now(), - nodeCount: len(nodes), +func newMetricSnapshot( + metricSnapper func(time.Time) map[string]float64, ts time.Time, +) metricSnapshot { + snap := metricSnapshot{ + time: ts, + metrics: metricSnapper(ts), } + return snap } -// diskDiffThroughput estimates throughput between two time intervals as mb/s/node by assuming -// that the total bytes written between the time intervals is diskUsage_End - diskUsage_Start. -func diskDiffThroughput(start sizeTime, end sizeTime) float64 { - if start.nodeCount != end.nodeCount { - panic("node count cannot change while measuring throughput") - } - return (float64(end.size-start.size) / end.time.Sub(start.time).Seconds()) / float64(start.nodeCount) +// calcThroughput estimates throughput between two time intervals as metric_unit/s/node +// for the provided metric, assuming the cluster had the same number of nodes +// over the interval. +func calcThroughput( + startMetric float64, endMetric float64, interval time.Duration, nodeCount int, +) float64 { + return (endMetric - startMetric) / (interval.Seconds() * float64(nodeCount)) } type c2cMetrics struct { - start sizeTime + initalScanStart metricSnapshot - initialScanEnd sizeTime + initialScanEnd metricSnapshot - cutoverStart sizeTime + // cutoverTo records stats at the system time to which the dst cluster cuts over to. + cutoverTo metricSnapshot - cutoverEnd sizeTime + cutoverStart metricSnapshot + + cutoverEnd metricSnapshot fingerprintingStart time.Time fingerprintingEnd time.Time } -func (m c2cMetrics) export() map[string]exportedMetric { - metrics := map[string]exportedMetric{} +// export summarizes all metrics gathered throughout the test. +func (m c2cMetrics) export(t test.Test, nodeCount int) { + + // aggregate aggregates metric snapshots across two time periods. A non-zero + // durationOverride will be used instead of the duration between the two + // passed in snapshots. + aggregate := func( + start metricSnapshot, + end metricSnapshot, + label string, + durationOverride time.Duration) { + if start.metrics == nil || end.metrics == nil { + return + } - populate := func(start sizeTime, end sizeTime, label string) { - metrics[label+"Duration"] = newMetric(end.time.Sub(start.time).Minutes(), "Minutes") + metrics := map[string]float64{} + duration := durationOverride + if duration == 0 { + duration = end.time.Sub(start.time) + } + metrics["Duration Minutes"] = duration.Minutes() - // Describes the cluster size difference between two timestamps. - metrics[label+"Size"] = newMetric(float64(end.size-start.size), "MB") - metrics[label+"Throughput"] = newMetric(diskDiffThroughput(start, end), "MB/S/Node") + for metricName := range start.metrics { + metrics["Size_"+metricName] = end.metrics[metricName] - start.metrics[metricName] + metrics["Throughput_"+metricName+"_MB/S/Node"] = calcThroughput( + start.metrics[metricName], end.metrics[metricName], duration, nodeCount) + } + // Print all the metrics for now while we wait for prom/grafana to visualize perf over time. + // Sort the metrics for pretty printing. + metricNames := make([]string, 0, len(metrics)) + for name := range metrics { + metricNames = append(metricNames, name) + } + sort.Strings(metricNames) + t.L().Printf("%s Perf:", label) + for _, name := range metricNames { + t.L().Printf("\t%s : %.2f", name, metrics[name]) + } } - if m.initialScanEnd.nodeCount != 0 { - populate(m.start, m.initialScanEnd, "InitialScan") - } + aggregate(m.initalScanStart, m.initialScanEnd, "InitialScan", 0) - if m.cutoverEnd.nodeCount != 0 { - populate(m.cutoverStart, m.cutoverEnd, "Cutover") - } - return metrics + aggregate(m.initialScanEnd, m.cutoverStart, "Workload", 0) + + // The _amount_ of data processed during cutover should be the data ingested between the + // timestamp we cut over to and the start of the cutover process. + aggregate(m.cutoverTo, m.cutoverStart, "Cutover", m.cutoverEnd.time.Sub(m.cutoverStart.time)) } func setupC2C( @@ -195,12 +262,14 @@ func setupC2C( workloadNode := c.Node(srcKVNodes + dstKVNodes + 1) c.Put(ctx, t.DeprecatedWorkload(), "./workload", workloadNode) - srcStartOps := option.DefaultStartOpts() + // TODO(msbutler): allow for backups once this test stabilizes a bit more. + srcStartOps := option.DefaultStartOptsNoBackups() srcStartOps.RoachprodOpts.InitTarget = 1 srcClusterSetting := install.MakeClusterSettings(install.SecureOption(true)) c.Start(ctx, t.L(), srcStartOps, srcClusterSetting, srcCluster) - dstStartOps := option.DefaultStartOpts() + // TODO(msbutler): allow for backups once this test stabilizes a bit more. + dstStartOps := option.DefaultStartOptsNoBackups() dstStartOps.RoachprodOpts.InitTarget = srcKVNodes + 1 dstClusterSetting := install.MakeClusterSettings(install.SecureOption(true)) c.Start(ctx, t.L(), dstStartOps, dstClusterSetting, dstCluster) @@ -232,24 +301,26 @@ func setupC2C( require.NoError(t, err) srcTenantInfo := clusterInfo{ - name: srcTenantName, - ID: srcTenantID, - pgURL: pgURL, - sql: srcSQL, - db: srcDB, - nodes: srcCluster} + name: srcTenantName, + ID: srcTenantID, + pgURL: pgURL, + sysSQL: srcSQL, + db: srcDB, + nodes: srcCluster} destTenantInfo := clusterInfo{ - name: destTenantName, - ID: destTenantID, - sql: destSQL, - db: destDB, - nodes: dstCluster} + name: destTenantName, + ID: destTenantID, + sysSQL: destSQL, + db: destDB, + nodes: dstCluster} - return &c2cSetup{ + setup := &c2cSetup{ src: srcTenantInfo, dst: destTenantInfo, workloadNode: workloadNode, metrics: c2cMetrics{}} + + return setup } type streamingWorkload interface { @@ -338,6 +409,21 @@ func registerClusterToCluster(r registry.Registry) { additionalDuration: 10 * time.Minute, cutover: 5 * time.Minute, }, + { + name: "c2c/tpcc/warehouses=1000/duration=60/cutover=30", + srcNodes: 4, + dstNodes: 4, + cpus: 8, + pdSize: 1000, + // 500 warehouses adds 30 GB to source + // + // TODO(msbutler): increase default test to 1000 warehouses once fingerprinting + // job speeds up. + workload: replicateTPCC{warehouses: 1000}, + timeout: 3 * time.Hour, + additionalDuration: 60 * time.Minute, + cutover: 30 * time.Minute, + }, { name: "c2c/kv0", srcNodes: 3, @@ -366,23 +452,16 @@ func registerClusterToCluster(r registry.Registry) { Run: func(ctx context.Context, t test.Test, c cluster.Cluster) { setup := setupC2C(ctx, t, c, sp.srcNodes, sp.dstNodes) m := c.NewMonitor(ctx, setup.src.nodes.Merge(setup.dst.nodes)) - du, err := NewDiskUsageTracker(c, t.L()) - require.NoError(t, err) - var initDuration time.Duration + metricSnapper := setup.startStatsCollection(ctx, t, c) if initCmd := sp.workload.sourceInitCmd(setup.src.name, setup.src.nodes); initCmd != "" { t.Status("populating source cluster before replication") - setup.metrics.start = newSizeTime(ctx, du, setup.src.nodes) + initStart := timeutil.Now() c.Run(ctx, setup.workloadNode, initCmd) - setup.metrics.initialScanEnd = newSizeTime(ctx, du, setup.src.nodes) - - initDuration = setup.metrics.initialScanEnd.time.Sub(setup.metrics.start.time) - t.L().Printf("src cluster workload initialization took %s minutes", initDuration) + t.L().Printf("src cluster workload initialization took %s minutes", timeutil.Since(initStart).Minutes()) } - t.L().Printf("begin workload on src cluster") workloadCtx, workloadCancel := context.WithCancel(ctx) defer workloadCancel() - workloadDoneCh := make(chan struct{}) m.Go(func(ctx context.Context) error { err := c.RunE(workloadCtx, setup.workloadNode, @@ -399,10 +478,14 @@ func registerClusterToCluster(r registry.Registry) { }) t.Status("starting replication stream") + setup.metrics.initalScanStart = newMetricSnapshot(metricSnapper, timeutil.Now()) + + // There's no need to remove the tenant limiters for this new app tenant, as + // all replication traffic flows through the system tenant. streamReplStmt := fmt.Sprintf("CREATE TENANT %q FROM REPLICATION OF %q ON '%s'", setup.dst.name, setup.src.name, setup.src.pgURL) - setup.dst.sql.Exec(t, streamReplStmt) - ingestionJobID := getIngestionJobID(t, setup.dst.sql, setup.dst.name) + setup.dst.sysSQL.Exec(t, streamReplStmt) + ingestionJobID := getIngestionJobID(t, setup.dst.sysSQL, setup.dst.name) // TODO(ssd): The job doesn't record the initial // statement time, so we can't correctly measure the @@ -416,12 +499,12 @@ func registerClusterToCluster(r registry.Registry) { t.L().Printf("waiting for replication stream to finish ingesting initial scan") waitForHighWatermark(t, setup.dst.db, ingestionJobID, sp.timeout/2) - + setup.metrics.initialScanEnd = newMetricSnapshot(metricSnapper, timeutil.Now()) t.Status(fmt.Sprintf(`initial scan complete. run workload and repl. stream for another %s minutes`, sp.additionalDuration)) var currentTime time.Time - setup.dst.sql.QueryRow(t, "SELECT clock_timestamp()").Scan(¤tTime) + setup.dst.sysSQL.QueryRow(t, "SELECT clock_timestamp()").Scan(¤tTime) cutoverTime := currentTime.Add(sp.additionalDuration - sp.cutover) t.Status("cutover time chosen: ", cutoverTime.String()) @@ -434,10 +517,13 @@ func registerClusterToCluster(r registry.Registry) { return } t.Status(fmt.Sprintf("waiting for replication stream to cutover to %s", cutoverTime.String())) - retainedTime := getReplicationRetainedTime(t, setup.dst.sql, roachpb.TenantName(setup.dst.name)) - setup.metrics.cutoverStart = newSizeTime(ctx, du, setup.dst.nodes) - stopReplicationStream(t, setup.dst.sql, ingestionJobID, cutoverTime) - setup.metrics.cutoverEnd = newSizeTime(ctx, du, setup.dst.nodes) + retainedTime := getReplicationRetainedTime(t, setup.dst.sysSQL, roachpb.TenantName(setup.dst.name)) + setup.metrics.cutoverTo = newMetricSnapshot(metricSnapper, cutoverTime) + setup.metrics.cutoverStart = newMetricSnapshot(metricSnapper, timeutil.Now()) + stopReplicationStream(t, setup.dst.sysSQL, ingestionJobID, cutoverTime) + setup.metrics.cutoverEnd = newMetricSnapshot(metricSnapper, timeutil.Now()) + + setup.metrics.export(t, len(setup.src.nodes)) t.Status("comparing fingerprints") compareTenantFingerprintsAtTimestamp( @@ -449,20 +535,6 @@ func registerClusterToCluster(r registry.Registry) { ) lv.assertValid(t) - // TODO(msbutler): export metrics to roachperf or prom/grafana - exportedMetrics := setup.metrics.export() - t.L().Printf(`Initial Scan: Duration, Size, Throughput; Cutover: Duration, Size, Throughput`) - t.L().Printf(`%s %s %s %s %s %s`, - exportedMetrics["InitialScanDuration"].String(), - exportedMetrics["InitialScanSize"].String(), - exportedMetrics["InitialScanThroughput"].String(), - exportedMetrics["CutoverDuration"].String(), - exportedMetrics["CutoverSize"].String(), - exportedMetrics["CutoverThroughput"].String(), - ) - for key, metric := range exportedMetrics { - t.L().Printf("%s: %s", key, metric.String()) - } }, }) } @@ -495,14 +567,14 @@ AS OF SYSTEM TIME '%s'`, startTimeStr, aost) var srcFingerprint int64 m.Go(func(ctx context.Context) error { - setup.src.sql.QueryRow(t, fingerprintQuery, setup.src.ID).Scan(&srcFingerprint) + setup.src.sysSQL.QueryRow(t, fingerprintQuery, setup.src.ID).Scan(&srcFingerprint) return nil }) var destFingerprint int64 m.Go(func(ctx context.Context) error { // TODO(adityamaru): Measure and record fingerprinting throughput. setup.metrics.fingerprintingStart = timeutil.Now() - setup.dst.sql.QueryRow(t, fingerprintQuery, setup.dst.ID).Scan(&destFingerprint) + setup.dst.sysSQL.QueryRow(t, fingerprintQuery, setup.dst.ID).Scan(&destFingerprint) setup.metrics.fingerprintingEnd = timeutil.Now() fingerprintingDuration := setup.metrics.fingerprintingEnd.Sub(setup.metrics.fingerprintingStart).String() t.L().Printf("fingerprinting the destination tenant took %s", fingerprintingDuration) @@ -611,7 +683,8 @@ func srcClusterSettings(t test.Test, db *sqlutils.SQLRunner) { } func destClusterSettings(t test.Test, db *sqlutils.SQLRunner) { - db.ExecMultiple(t, `SET CLUSTER SETTING cross_cluster_replication.enabled = true;`) + db.ExecMultiple(t, `SET CLUSTER SETTING cross_cluster_replication.enabled = true;`, + `SET CLUSTER SETTING kv.rangefeed.enabled = true;`) } func copyPGCertsAndMakeURL( diff --git a/pkg/cmd/roachtest/tests/multitenant_utils.go b/pkg/cmd/roachtest/tests/multitenant_utils.go index 35603c449a53..f1cf7a0ae4e8 100644 --- a/pkg/cmd/roachtest/tests/multitenant_utils.go +++ b/pkg/cmd/roachtest/tests/multitenant_utils.go @@ -313,6 +313,8 @@ func createInMemoryTenant( sysSQL.Exec(t, "CREATE TENANT $1", tenantName) sysSQL.Exec(t, "ALTER TENANT $1 START SERVICE SHARED", tenantName) + removeTenantRateLimiters(t, sysSQL, tenantName) + // Opening a SQL session to a newly created in-process tenant may require a // few retries. Unfortunately, the c.ConnE and MakeSQLRunner APIs do not make // it clear if they eagerly open a session with the tenant or wait until the @@ -331,14 +333,22 @@ func createInMemoryTenant( return nil }) - // Currently, a tenant has by default a 10m RU burst limit, which can be - // reached during these tests. To prevent RU limit throttling, add 10B RUs to - // the tenant. - var tenantID int - sysSQL.QueryRow(t, `SELECT id FROM [SHOW TENANT $1]`, tenantName).Scan(&tenantID) - sysSQL.Exec(t, `SELECT crdb_internal.update_tenant_resource_limits($1, 10000000000, 0, -10000000000, now(), 0);`, tenantID) if secure { createTenantAdminRole(t, tenantName, tenantSQL) } } + +// removeTenantRateLimiters ensures the tenant is not throttled by limiters. +func removeTenantRateLimiters(t test.Test, systemSQL *sqlutils.SQLRunner, tenantName string) { + var tenantID int + systemSQL.QueryRow(t, `SELECT id FROM [SHOW TENANT $1]`, tenantName).Scan(&tenantID) + systemSQL.Exec(t, `SELECT crdb_internal.update_tenant_resource_limits($1, 10000000000, 0, +10000000000, now(), 0);`, tenantID) + systemSQL.ExecMultiple(t, + `SET CLUSTER SETTING kv.tenant_rate_limiter.burst_limit_seconds = 10000;`, + `SET CLUSTER SETTING kv.tenant_rate_limiter.rate_limit = -1000; `, + `SET CLUSTER SETTING kv.tenant_rate_limiter.read_batch_cost = 0;`, + `SET CLUSTER SETTING kv.tenant_rate_limiter.read_cost_per_mebibyte = 0;`, + `SET CLUSTER SETTING kv.tenant_rate_limiter.write_cost_per_megabyte = 0;`, + `SET CLUSTER SETTING kv.tenant_rate_limiter.write_request_cost = 0;`) +} diff --git a/pkg/server/admin.go b/pkg/server/admin.go index 7d2370abf3b3..a0ebe8f15e45 100644 --- a/pkg/server/admin.go +++ b/pkg/server/admin.go @@ -2245,40 +2245,49 @@ func jobsHelper( cfg *BaseConfig, sv *settings.Values, ) (_ *serverpb.JobsResponse, retErr error) { - retryRunningCondition := "status='running' AND next_run > now() AND num_runs > 1" - retryRevertingCondition := "status='reverting' AND next_run > now() AND num_runs > 1" q := makeSQLQuery() q.Append(` - SELECT job_id, job_type, description, statement, user_name, descriptor_ids, - case - when ` + retryRunningCondition + ` then 'retry-running' - when ` + retryRevertingCondition + ` then 'retry-reverting' - else status - end as status, running_status, created, started, finished, modified, fraction_completed, - high_water_timestamp, error, last_run, next_run, num_runs, execution_events::string, coordinator_id - FROM crdb_internal.jobs - WHERE true - `) - if req.Status == "retrying" { - q.Append(" AND ( ( " + retryRunningCondition + " ) OR ( " + retryRevertingCondition + " ) )") - } else if req.Status != "" { +SELECT + job_id, + job_type, + description, + statement, + user_name, + descriptor_ids, + status, + running_status, + created, + started, + finished, + modified, + fraction_completed, + high_water_timestamp, + error, + last_run, + next_run, + num_runs, + execution_events::string, + coordinator_id +FROM crdb_internal.jobs +WHERE true`) // Simplifies filter construction below. + if req.Status != "" { q.Append(" AND status = $", req.Status) } if req.Type != jobspb.TypeUnspecified { q.Append(" AND job_type = $", req.Type.String()) } else { // Don't show automatic jobs in the overview page. - q.Append(" AND (") + q.Append(" AND ( job_type NOT IN (") for idx, jobType := range jobspb.AutomaticJobTypes { - q.Append("job_type != $", jobType.String()) - if idx < len(jobspb.AutomaticJobTypes)-1 { - q.Append(" AND ") + if idx != 0 { + q.Append(", ") } + q.Append("$", jobType.String()) } - q.Append(" OR job_type IS NULL)") + q.Append(" ) OR job_type IS NULL)") } - q.Append("ORDER BY created DESC") + q.Append(" ORDER BY created DESC") if req.Limit > 0 { q.Append(" LIMIT $", tree.DInt(req.Limit)) } diff --git a/pkg/server/admin_test.go b/pkg/server/admin_test.go index b3a2b02e5505..e5f2680ef65a 100644 --- a/pkg/server/admin_test.go +++ b/pkg/server/admin_test.go @@ -1725,11 +1725,6 @@ func TestAdminAPIJobs(t *testing.T) { append(append([]int64{}, revertingOnlyIds...), retryRevertingIds...), []int64{}, }, - { - "jobs?status=retrying", - append(append([]int64{}, retryRunningIds...), retryRevertingIds...), - []int64{}, - }, { "jobs?status=pending", []int64{}, @@ -1807,11 +1802,6 @@ func TestAdminAPIJobsDetails(t *testing.T) { defer s.Stopper().Stop(context.Background()) sqlDB := sqlutils.MakeSQLRunner(conn) - runningOnlyIds := []int64{1, 3, 5} - revertingOnlyIds := []int64{2, 4, 6} - retryRunningIds := []int64{7} - retryRevertingIds := []int64{8} - now := timeutil.Now() encodedError := func(err error) *errors.EncodedError { @@ -1891,32 +1881,6 @@ func TestAdminAPIJobsDetails(t *testing.T) { t.Fatal(err) } - // test that the select statement correctly converts expected jobs to retry-____ statuses - expectedStatuses := []struct { - status string - ids []int64 - }{ - {"running", runningOnlyIds}, - {"reverting", revertingOnlyIds}, - {"retry-running", retryRunningIds}, - {"retry-reverting", retryRevertingIds}, - } - for _, expected := range expectedStatuses { - var jobsWithStatus []serverpb.JobResponse - for _, job := range res.Jobs { - for _, expectedID := range expected.ids { - if job.ID == expectedID { - jobsWithStatus = append(jobsWithStatus, job) - } - } - } - - require.Len(t, jobsWithStatus, len(expected.ids)) - for _, job := range jobsWithStatus { - assert.Equal(t, expected.status, job.Status) - } - } - // Trim down our result set to the jobs we injected. resJobs := append([]serverpb.JobResponse(nil), res.Jobs...) sort.Slice(resJobs, func(i, j int) bool { diff --git a/pkg/ui/workspaces/cluster-ui/src/jobs/jobsPage/jobsPage.tsx b/pkg/ui/workspaces/cluster-ui/src/jobs/jobsPage/jobsPage.tsx index cd13b934f511..b9528c3e6405 100644 --- a/pkg/ui/workspaces/cluster-ui/src/jobs/jobsPage/jobsPage.tsx +++ b/pkg/ui/workspaces/cluster-ui/src/jobs/jobsPage/jobsPage.tsx @@ -26,7 +26,14 @@ import { Pagination, ResultsPerPageLabel } from "src/pagination"; import { isSelectedColumn } from "src/columnsSelector/utils"; import { DATE_FORMAT_24_UTC, syncHistory, TimestampToMoment } from "src/util"; import { jobsColumnLabels, JobsTable, makeJobsColumns } from "./jobsTable"; -import { showOptions, statusOptions, typeOptions } from "../util"; +import { + showOptions, + statusOptions, + typeOptions, + isValidJobStatus, + defaultRequestOptions, + isValidJobType, +} from "../util"; import { commonStyles } from "src/common"; import sortableTableStyles from "src/sortedtable/sortedtable.module.scss"; @@ -108,8 +115,8 @@ export class JobsPage extends React.Component { } // Filter Status. - const status = searchParams.get("status") || undefined; - if (this.props.setStatus && status && status != this.props.status) { + const status = searchParams.get("status"); + if (this.props.setStatus && status && status !== this.props.status) { this.props.setStatus(status); } @@ -145,6 +152,17 @@ export class JobsPage extends React.Component { } componentDidUpdate(prevProps: JobsPageProps): void { + // Because we removed the retrying status, we add this check + // just in case there exists an app that attempts to load a non-existent + // status. + if (!isValidJobStatus(this.props.status)) { + this.onStatusSelected(defaultRequestOptions.status); + } + + if (!isValidJobType(this.props.type)) { + this.onTypeSelected(defaultRequestOptions.type.toString()); + } + if ( prevProps.lastUpdated !== this.props.lastUpdated || prevProps.show !== this.props.show || @@ -273,27 +291,21 @@ export class JobsPage extends React.Component { Status:{" "} - { - statusOptions.find(option => option["value"] === status)[ - "name" - ] - } + {statusOptions.find(option => option.value === status)?.name} Type:{" "} { - typeOptions.find( - option => option["value"] === type.toString(), - )["name"] + typeOptions.find(option => option.value === type.toString()) + ?.name } - Show:{" "} - {showOptions.find(option => option["value"] === show)["name"]} + Show: {showOptions.find(option => option.value === show)?.name} diff --git a/pkg/ui/workspaces/cluster-ui/src/jobs/util/jobOptions.tsx b/pkg/ui/workspaces/cluster-ui/src/jobs/util/jobOptions.tsx index f51fafceed29..04541fd36f8f 100644 --- a/pkg/ui/workspaces/cluster-ui/src/jobs/util/jobOptions.tsx +++ b/pkg/ui/workspaces/cluster-ui/src/jobs/util/jobOptions.tsx @@ -33,12 +33,8 @@ export function jobToVisual(job: Job): JobStatusVisual { return JobStatusVisual.BadgeWithErrorMessage; case JOB_STATUS_RUNNING: return JobStatusVisual.ProgressBarWithDuration; - case JOB_STATUS_RETRY_RUNNING: - return JobStatusVisual.ProgressBarWithDuration; case JOB_STATUS_PENDING: return JobStatusVisual.BadgeWithMessage; - case JOB_STATUS_RETRY_REVERTING: - return JobStatusVisual.BadgeWithRetrying; case JOB_STATUS_CANCELED: case JOB_STATUS_CANCEL_REQUESTED: case JOB_STATUS_PAUSED: @@ -59,19 +55,14 @@ export const JOB_STATUS_CANCEL_REQUESTED = "cancel-requested"; export const JOB_STATUS_PAUSED = "paused"; export const JOB_STATUS_PAUSE_REQUESTED = "paused-requested"; export const JOB_STATUS_RUNNING = "running"; -export const JOB_STATUS_RETRY_RUNNING = "retry-running"; export const JOB_STATUS_PENDING = "pending"; export const JOB_STATUS_REVERTING = "reverting"; export const JOB_STATUS_REVERT_FAILED = "revert-failed"; -export const JOB_STATUS_RETRY_REVERTING = "retry-reverting"; -export function isRetrying(status: string): boolean { - return [JOB_STATUS_RETRY_RUNNING, JOB_STATUS_RETRY_REVERTING].includes( - status, - ); -} export function isRunning(status: string): boolean { - return [JOB_STATUS_RUNNING, JOB_STATUS_RETRY_RUNNING].includes(status); + return [JOB_STATUS_RUNNING, JOB_STATUS_REVERTING].some(s => + status.includes(s), + ); } export function isTerminalState(status: string): boolean { return [JOB_STATUS_SUCCEEDED, JOB_STATUS_FAILED].includes(status); @@ -79,16 +70,28 @@ export function isTerminalState(status: string): boolean { export const statusOptions = [ { value: "", name: "All" }, - { value: "succeeded", name: "Succeeded" }, - { value: "failed", name: "Failed" }, - { value: "paused", name: "Paused" }, - { value: "canceled", name: "Canceled" }, - { value: "running", name: "Running" }, - { value: "pending", name: "Pending" }, - { value: "reverting", name: "Reverting" }, - { value: "retrying", name: "Retrying" }, + { value: JOB_STATUS_SUCCEEDED, name: "Succeeded" }, + { value: JOB_STATUS_FAILED, name: "Failed" }, + { value: JOB_STATUS_PAUSED, name: "Paused" }, + { value: JOB_STATUS_PAUSE_REQUESTED, name: "Pause Requested" }, + { value: JOB_STATUS_CANCELED, name: "Canceled" }, + { value: JOB_STATUS_CANCEL_REQUESTED, name: "Cancel Requested" }, + { value: JOB_STATUS_RUNNING, name: "Running" }, + { value: JOB_STATUS_PENDING, name: "Pending" }, + { value: JOB_STATUS_REVERTING, name: "Reverting" }, + { value: JOB_STATUS_REVERT_FAILED, name: "Revert Failed" }, ]; +const ALL_JOB_STATUSES = new Set(statusOptions.map(option => option.value)); + +/** + * @param jobStatus job status - any string + * @returns Returns true if the job status string is a valid status. + */ +export function isValidJobStatus(jobStatus: string): boolean { + return ALL_JOB_STATUSES.has(jobStatus); +} + export function jobHasOneOfStatuses(job: Job, ...statuses: string[]): boolean { return statuses.indexOf(job.status) !== -1; } @@ -110,21 +113,10 @@ export const jobStatusToBadgeStatus = (status: string): BadgeStatus => { case JOB_STATUS_PAUSED: case JOB_STATUS_PAUSE_REQUESTED: case JOB_STATUS_REVERTING: - case JOB_STATUS_RETRY_REVERTING: default: return "default"; } }; -export const jobStatusToBadgeText = (status: string): string => { - switch (status) { - case JOB_STATUS_RETRY_REVERTING: - return JOB_STATUS_REVERTING; - case JOB_STATUS_RETRY_RUNNING: - return JOB_STATUS_RUNNING; - default: - return status; - } -}; const jobTypeKeys = Object.keys(JobType); @@ -216,6 +208,10 @@ export const typeOptions = [ }, ]; +export function isValidJobType(jobType: number): boolean { + return jobType >= 0 && jobType < jobTypeKeys.length; +} + export const showOptions = [ { value: "50", name: "Latest 50" }, { value: "0", name: "All" }, diff --git a/pkg/ui/workspaces/cluster-ui/src/jobs/util/jobStatus.tsx b/pkg/ui/workspaces/cluster-ui/src/jobs/util/jobStatus.tsx index efdfb379710d..30c25a96a333 100644 --- a/pkg/ui/workspaces/cluster-ui/src/jobs/util/jobStatus.tsx +++ b/pkg/ui/workspaces/cluster-ui/src/jobs/util/jobStatus.tsx @@ -13,7 +13,7 @@ import classNames from "classnames/bind"; import React from "react"; import { Duration } from "./duration"; -import { JobStatusVisual, isRetrying, jobToVisual } from "./jobOptions"; +import { JobStatusVisual, jobToVisual } from "./jobOptions"; import { JobStatusBadge, ProgressBar, @@ -54,7 +54,6 @@ export const JobStatus: React.FC = ({ ); case JobStatusVisual.ProgressBarWithDuration: { - const jobIsRetrying = isRetrying(job.status); return (
= ({ showPercentage={true} /> - {jobIsRetrying && } {job.running_status && (
{job.running_status} diff --git a/pkg/ui/workspaces/cluster-ui/src/jobs/util/jobStatusCell.tsx b/pkg/ui/workspaces/cluster-ui/src/jobs/util/jobStatusCell.tsx index 6eef6bec96db..5cb9deac075b 100644 --- a/pkg/ui/workspaces/cluster-ui/src/jobs/util/jobStatusCell.tsx +++ b/pkg/ui/workspaces/cluster-ui/src/jobs/util/jobStatusCell.tsx @@ -8,13 +8,9 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. import { cockroach } from "@cockroachlabs/crdb-protobuf-client"; -import { Tooltip } from "@cockroachlabs/ui-components"; import React from "react"; -import { TimestampToMoment } from "src/util"; -import { DATE_FORMAT_24_UTC } from "src/util/format"; import { JobStatus } from "./jobStatus"; -import { isRetrying } from "./jobOptions"; type Job = cockroach.server.serverpb.IJobResponse; @@ -30,31 +26,11 @@ export const JobStatusCell: React.FC = ({ lineWidth, compact = false, hideDuration = false, -}) => { - const jobStatus = ( - - ); - if (isRetrying(job.status)) { - return ( - - Next Planned Execution Time: -
- {TimestampToMoment(job.next_run).format(DATE_FORMAT_24_UTC)} - - } - > - {jobStatus} -
- ); - } - return jobStatus; -}; +}) => ( + +); diff --git a/pkg/ui/workspaces/cluster-ui/src/jobs/util/progressBar.tsx b/pkg/ui/workspaces/cluster-ui/src/jobs/util/progressBar.tsx index a8d057fbb7c1..56a320d24fdf 100644 --- a/pkg/ui/workspaces/cluster-ui/src/jobs/util/progressBar.tsx +++ b/pkg/ui/workspaces/cluster-ui/src/jobs/util/progressBar.tsx @@ -12,7 +12,7 @@ import { Line } from "rc-progress"; import React from "react"; import { Badge } from "src/badge"; -import { jobStatusToBadgeStatus, jobStatusToBadgeText } from "./jobOptions"; +import { jobStatusToBadgeStatus } from "./jobOptions"; import styles from "../jobs.module.scss"; import classNames from "classnames/bind"; @@ -25,8 +25,7 @@ export class JobStatusBadge extends React.PureComponent<{ jobStatus: string }> { render(): React.ReactElement { const jobStatus = this.props.jobStatus; const badgeStatus = jobStatusToBadgeStatus(jobStatus); - const badgeText = jobStatusToBadgeText(jobStatus); - return ; + return ; } }