Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
96537: streamingest: add a metric for replication cutover progress r=lidorcarmel a=lidorcarmel

We already have the progress info in the job as the percentage of ranges that were reverted. This commit adds the number of ranges that are left to be reverted as a metric.

Epic: CRDB-18752

Fixes: #96536

Release note: None

96701: clusterversion: remove TODOPreV22_1 r=RaduBerinde a=RaduBerinde

This change removes this constant and the obsolete code that uses it.

Release note: None
Epic: None

96711: kvstorage: move uninit replica creation to kvstorage r=tbg a=pavelkalinnikov

This commit factors out the code that creates an uninitialized replica in
storage, into the kvstorage package.

Part of #93898

Co-authored-by: Lidor Carmel <[email protected]>
Co-authored-by: Radu Berinde <[email protected]>
Co-authored-by: Pavel Kalinnikov <[email protected]>
  • Loading branch information
4 people committed Feb 7, 2023
4 parents f640acb + 45faeeb + 16878ed + 09a7998 commit 8022f2a
Show file tree
Hide file tree
Showing 16 changed files with 117 additions and 96 deletions.
1 change: 0 additions & 1 deletion pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,6 @@ go_test(
"//pkg/ccl/utilccl",
"//pkg/cloud",
"//pkg/cloud/impl:cloudimpl",
"//pkg/clusterversion",
"//pkg/gossip",
"//pkg/internal/sqlsmith",
"//pkg/jobs",
Expand Down
5 changes: 0 additions & 5 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import (
_ "github.com/cockroachdb/cockroach/pkg/ccl/partitionccl"
"github.com/cockroachdb/cockroach/pkg/ccl/utilccl"
_ "github.com/cockroachdb/cockroach/pkg/cloud/impl" // registers cloud storage providers
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/internal/sqlsmith"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
Expand Down Expand Up @@ -297,10 +296,6 @@ func TestChangefeedIdleness(t *testing.T) {
changefeedbase.IdleTimeout.Override(
context.Background(), &s.Server.ClusterSettings().SV, 3*time.Second)

// Idleness functionality is version gated
knobs := s.TestingKnobs.Server.(*server.TestingKnobs)
knobs.BinaryVersionOverride = clusterversion.ByKey(clusterversion.TODOPreV22_1)

registry := s.Server.JobRegistry().(*jobs.Registry)
currentlyIdle := registry.MetricsStruct().JobMetrics[jobspb.TypeChangefeed].CurrentlyIdle
waitForIdleCount := func(numIdle int64) {
Expand Down
13 changes: 13 additions & 0 deletions pkg/ccl/streamingccl/streamingest/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,17 @@ var (
Measurement: "Job Updates",
Unit: metric.Unit_COUNT,
}
// This metric would be 0 until cutover begins, and then it will be updated to
// the total number of ranges that need to be reverted, and then gradually go
// down to 0 again. NB: that the number of ranges is the total number of
// ranges left to be reverted, but some may not have writes and therefore the
// revert will be a no-op for those ranges.
metaReplicationCutoverProgress = metric.Metadata{
Name: "replication.cutover_progress",
Help: "The number of ranges left to revert in order to complete an inflight cutover",
Measurement: "Ranges",
Unit: metric.Unit_COUNT,
}
)

// Metrics are for production monitoring of stream ingestion jobs.
Expand All @@ -136,6 +147,7 @@ type Metrics struct {
DataCheckpointSpanCount *metric.Gauge
FrontierCheckpointSpanCount *metric.Gauge
FrontierLagSeconds *metric.GaugeFloat64
ReplicationCutoverProgress *metric.Gauge
}

// MetricStruct implements the metric.Struct interface.
Expand Down Expand Up @@ -177,6 +189,7 @@ func MakeMetrics(histogramWindow time.Duration) metric.Struct {
DataCheckpointSpanCount: metric.NewGauge(metaDataCheckpointSpanCount),
FrontierCheckpointSpanCount: metric.NewGauge(metaFrontierCheckpointSpanCount),
FrontierLagSeconds: metric.NewGaugeFloat64(metaFrontierLagSeconds),
ReplicationCutoverProgress: metric.NewGauge(metaReplicationCutoverProgress),
}
return m
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,8 @@ func maybeRevertToCutoverTimestamp(

p := execCtx.(sql.JobExecContext)
db := p.ExecCfg().DB
j, err := p.ExecCfg().JobRegistry.LoadJob(ctx, ingestionJobID)
jobRegistry := p.ExecCfg().JobRegistry
j, err := jobRegistry.LoadJob(ctx, ingestionJobID)
if err != nil {
return false, err
}
Expand Down Expand Up @@ -542,6 +543,8 @@ func maybeRevertToCutoverTimestamp(
if err != nil {
return err
}
m := jobRegistry.MetricsStruct().StreamIngest.(*Metrics)
m.ReplicationCutoverProgress.Update(int64(nRanges))
if origNRanges == -1 {
origNRanges = nRanges
}
Expand Down
13 changes: 13 additions & 0 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,9 @@ func TestCutoverFractionProgressed(t *testing.T) {
return jobs.UpdateHighwaterProgressed(cutover, md, ju)
}))

metrics := registry.MetricsStruct().StreamIngest.(*Metrics)
require.Equal(t, int64(0), metrics.ReplicationCutoverProgress.Value())

g := ctxgroup.WithContext(ctx)
g.GoCtx(func(ctx context.Context) error {
defer close(respRecvd)
Expand All @@ -491,6 +494,7 @@ func TestCutoverFractionProgressed(t *testing.T) {
"0.67": false,
"0.83": false,
}
var expectedRanges int64 = 6
g.GoCtx(func(ctx context.Context) error {
for {
select {
Expand All @@ -506,6 +510,14 @@ func TestCutoverFractionProgressed(t *testing.T) {
if _, ok := progressMap[s]; !ok {
t.Fatalf("unexpected progress fraction %s", s)
}
// We sometimes see the same progress, which is valid, no need to update
// the expected range count.
if expectedRanges != metrics.ReplicationCutoverProgress.Value() {
// There is progress, which means that another range was reverted,
// updated the expected range count.
expectedRanges--
}
require.Equal(t, expectedRanges, metrics.ReplicationCutoverProgress.Value())
progressMap[s] = true
continueRevert <- struct{}{}
}
Expand All @@ -514,6 +526,7 @@ func TestCutoverFractionProgressed(t *testing.T) {
require.NoError(t, g.Wait())
sip := loadProgress()
require.Equal(t, sip.GetFractionCompleted(), float32(1))
require.Equal(t, int64(0), metrics.ReplicationCutoverProgress.Value())

// Ensure we have hit all our expected progress fractions.
for k, v := range progressMap {
Expand Down
4 changes: 0 additions & 4 deletions pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,10 +420,6 @@ func (k Key) String() string {
return ByKey(k).String()
}

// TODOPreV22_1 is an alias for V22_1 for use in any version gate/check that
// previously referenced a < 22.1 version until that check/gate can be removed.
const TODOPreV22_1 = V22_1

// Offset every version +1M major versions into the future if this is a dev branch.
const DevOffset = 1000000

Expand Down
61 changes: 61 additions & 0 deletions pkg/kv/kvserver/kvstorage/replica_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@ package kvstorage
import (
"context"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/errors"
"go.etcd.io/raft/v3/raftpb"
)
Expand Down Expand Up @@ -101,3 +103,62 @@ func (r LoadedReplicaState) check(storeID roachpb.StoreID) error {
}
return nil
}

// CreateUninitializedReplica creates an uninitialized replica in storage.
// Returns roachpb.RaftGroupDeletedError if this replica can not be created
// because it has been deleted.
func CreateUninitializedReplica(
ctx context.Context,
eng storage.Engine,
storeID roachpb.StoreID,
rangeID roachpb.RangeID,
replicaID roachpb.ReplicaID,
) error {
// Before creating the replica, see if there is a tombstone which would
// indicate that this replica has been removed.
tombstoneKey := keys.RangeTombstoneKey(rangeID)
var tombstone roachpb.RangeTombstone
if ok, err := storage.MVCCGetProto(
ctx, eng, tombstoneKey, hlc.Timestamp{}, &tombstone, storage.MVCCGetOptions{},
); err != nil {
return err
} else if ok && replicaID < tombstone.NextReplicaID {
return &roachpb.RaftGroupDeletedError{}
}

// Write the RaftReplicaID for this replica. This is the only place in the
// CockroachDB code that we are creating a new *uninitialized* replica.
// Note that it is possible that we have already created the HardState for
// an uninitialized replica, then crashed, and on recovery are receiving a
// raft message for the same or later replica.
// - Same replica: we are overwriting the RaftReplicaID with the same
// value, which is harmless.
// - Later replica: there may be an existing HardState for the older
// uninitialized replica with Commit=0 and non-zero Term and Vote. Using
// the Term and Vote values for that older replica in the context of
// this newer replica is harmless since it just limits the votes for
// this replica.
//
// Compatibility:
// - v21.2 and v22.1: v22.1 unilaterally introduces RaftReplicaID (an
// unreplicated range-id local key). If a v22.1 binary is rolled back at
// a node, the fact that RaftReplicaID was written is harmless to a
// v21.2 node since it does not read it. When a v21.2 drops an
// initialized range, the RaftReplicaID will also be deleted because the
// whole range-ID local key space is deleted.
// - v22.2: no changes: RaftReplicaID is written, but old Replicas may not
// have it yet.
// - v23.1: at startup, we remove any uninitialized replicas that have a
// HardState but no RaftReplicaID, see kvstorage.LoadAndReconcileReplicas.
// So after first call to this method we have the invariant that all replicas
// have a RaftReplicaID persisted.
sl := stateloader.Make(rangeID)
if err := sl.SetRaftReplicaID(ctx, eng, replicaID); err != nil {
return err
}

// Make sure that storage invariants for this uninitialized replica hold.
uninitDesc := roachpb.RangeDescriptor{RangeID: rangeID}
_, err := LoadReplicaState(ctx, eng, storeID, &uninitDesc, replicaID)
return err
}
52 changes: 2 additions & 50 deletions pkg/kv/kvserver/store_create_replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,8 @@ import (
"context"
"time"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -211,54 +207,10 @@ func (s *Store) tryGetOrCreateReplica(
// be accessed by someone holding a reference to, or currently creating a
// Replica for this rangeID, and that's us.

// Before creating the replica, see if there is a tombstone which would
// indicate that this is a stale message.
tombstoneKey := keys.RangeTombstoneKey(rangeID)
var tombstone roachpb.RangeTombstone
if ok, err := storage.MVCCGetProto(
ctx, s.Engine(), tombstoneKey, hlc.Timestamp{}, &tombstone, storage.MVCCGetOptions{},
if err := kvstorage.CreateUninitializedReplica(
ctx, s.Engine(), s.StoreID(), rangeID, replicaID,
); err != nil {
return nil, false, err
} else if ok && replicaID < tombstone.NextReplicaID {
return nil, false, &roachpb.RaftGroupDeletedError{}
}

// Write the RaftReplicaID for this replica. This is the only place in the
// CockroachDB code that we are creating a new *uninitialized* replica.
// Note that it is possible that we have already created the HardState for
// an uninitialized replica, then crashed, and on recovery are receiving a
// raft message for the same or later replica.
// - Same replica: we are overwriting the RaftReplicaID with the same
// value, which is harmless.
// - Later replica: there may be an existing HardState for the older
// uninitialized replica with Commit=0 and non-zero Term and Vote. Using
// the Term and Vote values for that older replica in the context of
// this newer replica is harmless since it just limits the votes for
// this replica.
//
// Compatibility:
// - v21.2 and v22.1: v22.1 unilaterally introduces RaftReplicaID (an
// unreplicated range-id local key). If a v22.1 binary is rolled back at
// a node, the fact that RaftReplicaID was written is harmless to a
// v21.2 node since it does not read it. When a v21.2 drops an
// initialized range, the RaftReplicaID will also be deleted because the
// whole range-ID local key space is deleted.
// - v22.2: no changes: RaftReplicaID is written, but old Replicas may not
// have it yet.
// - v23.1: at startup, we remove any unitialized replicas that have a
// HardState but no RaftReplicaID, see kvstorage.LoadAndReconcileReplicas.
// So after first call to this method we have the invariant that all replicas
// have a RaftReplicaID persisted.
sl := stateloader.Make(rangeID)
if err := sl.SetRaftReplicaID(ctx, s.Engine(), replicaID); err != nil {
return nil, false, err
}

// Make sure that storage invariants for this uninitialized replica hold.
uninitDesc := roachpb.RangeDescriptor{RangeID: rangeID}
_, err := kvstorage.LoadReplicaState(ctx, s.Engine(), s.StoreID(), &uninitDesc, replicaID)
if err != nil {
return nil, false, err
}

// Create a new uninitialized replica and lock it for raft processing.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@ USE parent;
CREATE DATABASE pgdatabase;
USE test;

statement error pq: cannot perform ALTER DATABASE CONVERT TO SCHEMA in version
statement error pq: cannot perform ALTER DATABASE CONVERT TO SCHEMA
ALTER DATABASE parent CONVERT TO SCHEMA WITH PARENT pgdatabase
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ statement error pq: feature ALTER DATABASE is part of the schema change category
ALTER DATABASE d RENAME TO r

# Test REPARENT DATABASE
statement error pq: cannot perform ALTER DATABASE CONVERT TO SCHEMA in version
statement error pq: cannot perform ALTER DATABASE CONVERT TO SCHEMA
ALTER DATABASE d CONVERT TO SCHEMA WITH PARENT test

# Test ALTER TABLE PARTITION BY.
Expand Down
5 changes: 1 addition & 4 deletions pkg/sql/reparent_database.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ package sql
import (
"context"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
Expand All @@ -22,7 +21,5 @@ import (
func (p *planner) ReparentDatabase(
ctx context.Context, n *tree.ReparentDatabase,
) (planNode, error) {
return nil, pgerror.Newf(pgcode.FeatureNotSupported,
"cannot perform ALTER DATABASE CONVERT TO SCHEMA in version %v and beyond",
clusterversion.TODOPreV22_1)
return nil, pgerror.Newf(pgcode.FeatureNotSupported, "cannot perform ALTER DATABASE CONVERT TO SCHEMA")
}
1 change: 0 additions & 1 deletion pkg/sql/syntheticprivilege/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ go_library(
importpath = "github.com/cockroachdb/cockroach/pkg/sql/syntheticprivilege",
visibility = ["//visibility:public"],
deps = [
"//pkg/clusterversion",
"//pkg/security/username",
"//pkg/sql/catalog/catpb",
"//pkg/sql/catalog/descpb",
Expand Down
6 changes: 0 additions & 6 deletions pkg/sql/syntheticprivilege/global_privilege.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
package syntheticprivilege

import (
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/opt/cat"
Expand All @@ -37,11 +36,6 @@ func (p *GlobalPrivilege) GetPath() string {
return "/global/"
}

// SystemPrivilegesTableVersionGate implements the Object interface.
func (p *GlobalPrivilege) SystemPrivilegesTableVersionGate() clusterversion.Key {
return clusterversion.TODOPreV22_1
}

// GlobalPrivilegeObject is one of one since it is global.
// We can use a const to identify it.
var GlobalPrivilegeObject = &GlobalPrivilege{}
Expand Down
9 changes: 4 additions & 5 deletions pkg/storage/min_version_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,11 @@ func TestSetMinVersion(t *testing.T) {
defer p.Close()
require.Equal(t, pebble.FormatMostCompatible, p.db.FormatMajorVersion())

// Advancing the store cluster version to TODOPreV22_1
// should also advance the store's format major version.
err = p.SetMinVersion(clusterversion.ByKey(clusterversion.TODOPreV22_1))
// Advancing the store cluster version to V22_2 should also advance the
// store's format major version.
err = p.SetMinVersion(clusterversion.ByKey(clusterversion.V22_2))
require.NoError(t, err)
require.Equal(t, pebble.FormatSplitUserKeysMarked, p.db.FormatMajorVersion())

require.Equal(t, pebble.FormatPrePebblev1Marked, p.db.FormatMajorVersion())
}

func TestMinVersion_IsNotEncrypted(t *testing.T) {
Expand Down
30 changes: 13 additions & 17 deletions pkg/storage/pebble.go
Original file line number Diff line number Diff line change
Expand Up @@ -1964,30 +1964,26 @@ func (p *Pebble) SetMinVersion(version roachpb.Version) error {
// at a version X+1, it is guaranteed that all nodes have already ratcheted
// their store version to the version X that enabled the feature at the Pebble
// level.
formatVers := pebble.FormatMostCompatible
var formatVers pebble.FormatMajorVersion
// Cases are ordered from newer to older versions.
switch {
case !version.Less(clusterversion.ByKey(clusterversion.V23_1EnsurePebbleFormatSSTableValueBlocks)):
if formatVers < pebble.FormatSSTableValueBlocks {
formatVers = pebble.FormatSSTableValueBlocks
}
formatVers = pebble.FormatSSTableValueBlocks

case !version.Less(clusterversion.ByKey(clusterversion.V22_2PebbleFormatPrePebblev1Marked)):
if formatVers < pebble.FormatPrePebblev1Marked {
formatVers = pebble.FormatPrePebblev1Marked
}
formatVers = pebble.FormatPrePebblev1Marked

case !version.Less(clusterversion.ByKey(clusterversion.V22_2EnsurePebbleFormatVersionRangeKeys)):
if formatVers < pebble.FormatRangeKeys {
formatVers = pebble.FormatRangeKeys
}
formatVers = pebble.FormatRangeKeys

case !version.Less(clusterversion.ByKey(clusterversion.V22_2PebbleFormatSplitUserKeysMarkedCompacted)):
if formatVers < pebble.FormatSplitUserKeysMarkedCompacted {
formatVers = pebble.FormatSplitUserKeysMarkedCompacted
}
case !version.Less(clusterversion.ByKey(clusterversion.TODOPreV22_1)):
if formatVers < pebble.FormatSplitUserKeysMarked {
formatVers = pebble.FormatSplitUserKeysMarked
}
formatVers = pebble.FormatSplitUserKeysMarkedCompacted

default:
// Corresponds to V22_1.
formatVers = pebble.FormatSplitUserKeysMarked
}

if p.db.FormatMajorVersion() < formatVers {
if err := p.db.RatchetFormatMajorVersion(formatVers); err != nil {
return errors.Wrap(err, "ratcheting format major version")
Expand Down
Loading

0 comments on commit 8022f2a

Please sign in to comment.