Skip to content

Commit

Permalink
migration: introduce fence versions
Browse files Browse the repository at this point in the history
The migrations infrastructure makes use of internal fence versions when
stepping through consecutive versions. We'll need to first bump the
fence version for each intermediate cluster version, before bumping the
"real" one. Doing so allows us to provide the invariant that whenever a
cluster version is active, all nodes in the cluster (including ones
added concurrently during version upgrades) are running binaries that
know about the version.

It's instructive to walk through how we expect a version migration from
v21.1 to v21.2 to take place, and how we behave in the presence of new
v21.1 or v21.2 nodes being added to the cluster.
  - All nodes are running v21.1
  - All nodes are rolled into v21.2 binaries, but with active cluster
    version still as v21.1
  - The first version bump will be into v21.2(fence=1), see the
    migration manager above for where that happens
Then concurrently:
  - A new node is added to the cluster, but running binary v21.1
  - We try bumping the cluster gates to v21.2(fence=1)

If the v21.1 nodes manages to sneak in before the version bump, it's
fine as the version bump is a no-op one (all fence versions are). Any
subsequent bumps (including the "actual" one bumping to v21.2) will fail
during the validation step where we'll first check to see that all nodes
are running v21.2 binaries.

If the v21.1 node is only added after v21.2(fence=1) is active, it won't
be able to actually join the cluster (it'll be prevented by the join
RPC).

                              ---

We rely on the invariant we introduced earlier that requires new
user-defined versions (users being crdb engineers) to always have
even-numbered Internal versions. This reserved the odd numbers to slot
in fence versions for each user-defined one.

Release note: None
  • Loading branch information
irfansharif committed Nov 28, 2020
1 parent 8e8ccb1 commit 6787eec
Show file tree
Hide file tree
Showing 11 changed files with 244 additions and 178 deletions.
1 change: 1 addition & 0 deletions pkg/clusterversion/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ go_test(
"//pkg/roachpb",
"//pkg/util/leaktest",
"//vendor/github.com/cockroachdb/redact",
"//vendor/github.com/dustin/go-humanize",
"//vendor/github.com/stretchr/testify/require",
],
)
17 changes: 16 additions & 1 deletion pkg/clusterversion/clusterversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,13 +233,28 @@ func (cv ClusterVersion) IsActive(versionKey VersionKey) bool {
return cv.IsActiveVersion(v)
}

func (cv ClusterVersion) String() string { return redact.StringWithoutMarkers(cv) }
func (cv ClusterVersion) String() string {
return redact.StringWithoutMarkers(cv)
}

// SafeFormat implements the redact.SafeFormatter interface.
func (cv ClusterVersion) SafeFormat(p redact.SafePrinter, _ rune) {
p.Print(cv.Version)
}

// PrettyPrint returns the value in a format that makes it apparent whether or
// not it is a fence version.
func (cv ClusterVersion) PrettyPrint() string {
// If we're a version greater than v20.2 and have an odd internal version,
// we're a fence version. See fenceVersionFor in pkg/migration to understand
// what these are.
fenceVersion := !cv.Version.LessEq(roachpb.Version{Major: 20, Minor: 2}) && (cv.Internal%2) == 1
if !fenceVersion {
return cv.String()
}
return redact.Sprintf("%s%s", cv.String(), "(fence)").StripMarkers()
}

// ClusterVersionImpl implements the settings.ClusterVersionImpl interface.
func (cv ClusterVersion) ClusterVersionImpl() {}

Expand Down
17 changes: 17 additions & 0 deletions pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,3 +400,20 @@ var (
func VersionByKey(key VersionKey) roachpb.Version {
return versionsSingleton.MustByKey(key)
}

// GetVersionsBetween returns the list of cluster versions in the range
// (from, to].
func GetVersionsBetween(from, to ClusterVersion) []ClusterVersion {
return getVersionBetweenInternal(from, to, versionsSingleton)
}

func getVersionBetweenInternal(from, to ClusterVersion, vs keyedVersions) []ClusterVersion {
var cvs []ClusterVersion
for _, keyedV := range vs {
// Read: "from < keyedV <= to".
if from.Less(keyedV.Version) && keyedV.Version.LessEq(to.Version) {
cvs = append(cvs, ClusterVersion{Version: keyedV.Version})
}
}
return cvs
}
81 changes: 81 additions & 0 deletions pkg/clusterversion/cockroach_versions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/redact"
"github.com/dustin/go-humanize"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -48,3 +49,83 @@ func TestVersionFormat(t *testing.T) {
t.Errorf("expected %q, got %q", expected, actual)
}
}

func TestClusterVersionPrettyPrint(t *testing.T) {
defer leaktest.AfterTest(t)()

cv := func(major, minor, patch, internal int32) ClusterVersion {
return ClusterVersion{
Version: roachpb.Version{
Major: major,
Minor: minor,
Patch: patch,
Internal: internal,
},
}
}

var tests = []struct {
cv ClusterVersion
exp string
}{
{cv(19, 2, 1, 5), "19.2-5"},
{cv(20, 1, 0, 4), "20.1-4"},
{cv(20, 2, 0, 7), "20.2-7(fence)"},
{cv(20, 2, 0, 4), "20.2-4"},
{cv(20, 2, 1, 5), "20.2-5(fence)"},
{cv(20, 2, 1, 4), "20.2-4"},
}
for _, test := range tests {
if actual := test.cv.PrettyPrint(); actual != test.exp {
t.Errorf("expected %s, got %q", test.exp, actual)
}
}
}

func TestGetVersionsBetween(t *testing.T) {
defer leaktest.AfterTest(t)()

// Define a list of versions v3..v9
var vs keyedVersions
for i := 3; i < 10; i++ {
vs = append(vs, keyedVersion{
Key: VersionKey(42),
Version: roachpb.Version{Major: int32(i)},
})
}
cv := func(major int32) ClusterVersion {
return ClusterVersion{Version: roachpb.Version{Major: major}}
}
list := func(first, last int32) []ClusterVersion {
var cvs []ClusterVersion
for i := first; i <= last; i++ {
cvs = append(cvs, cv(i))
}
return cvs
}

var tests = []struct {
from, to ClusterVersion
exp []ClusterVersion
}{
{cv(5), cv(8), list(6, 8)},
{cv(1), cv(1), []ClusterVersion{}},
{cv(7), cv(7), []ClusterVersion{}},
{cv(1), cv(5), list(3, 5)},
{cv(6), cv(12), list(7, 9)},
{cv(4), cv(5), list(5, 5)},
}

for _, test := range tests {
actual := getVersionBetweenInternal(test.from, test.to, vs)
if len(actual) != len(test.exp) {
t.Errorf("expected %d versions, got %d", len(test.exp), len(actual))
}

for i := range test.exp {
if actual[i] != test.exp[i] {
t.Errorf("%s version incorrect: expected %s, got %s", humanize.Ordinal(i), test.exp[i], actual[i])
}
}
}
}
159 changes: 102 additions & 57 deletions pkg/migration/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package migration

import (
"context"
"fmt"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/kv"
Expand Down Expand Up @@ -192,62 +193,34 @@ func NewManager(
// Migrate runs the set of migrations required to upgrade the cluster version
// from the current version to the target one.
func (m *Manager) Migrate(ctx context.Context, from, to clusterversion.ClusterVersion) error {
// TODO(irfansharif): Should we inject every ctx here with specific labels
// for each migration, so they log distinctly?
ctx = logtags.AddTag(ctx, "migration-mgr", nil)
if from == to {
// Nothing to do here.
log.Infof(ctx, "no need to migrate, cluster already at newest version")
return nil
}

// TODO(irfansharif): Should we inject every ctx here with specific labels
// for each migration, so they log distinctly?
ctx = logtags.AddTag(ctx, "migration-mgr", nil)
log.Infof(ctx, "migrating cluster from %s to %s", from, to)

// TODO(irfansharif): We'll need to acquire a lease here and refresh it
// throughout during the migration to ensure mutual exclusion.

// TODO(irfansharif): We'll need to create a system table to store
// in-progress state of long running migrations, for introspection.

// TODO(irfansharif): We'll want to either write to a KV key to record the
// version up until which we've already migrated to, or consult the system
// table mentioned above. Perhaps it makes sense to consult any given
// `StoreClusterVersionKey`, since the manager here will want to push out
// cluster version bumps for vX before attempting to migrate into vX+1.

// TODO(irfansharif): After determining the last completed migration, if
// any, we'll be want to assemble the list of remaining migrations to step
// through to get to targetV.

// TODO(irfansharif): We'll need to introduce fence/noop versions in order
// for the infrastructure here to step through adjacent cluster versions.
// It's instructive to walk through how we expect a version migration from
// v21.1 to v21.2 to take place, and how we would behave in the presence of
// new v21.1 or v21.2 nodes being added to the cluster during.
// - All nodes are running v21.1
// - All nodes are rolled into v21.2 binaries, but with active cluster
// version still as v21.1
// - The first version bump will be into v21.2.0-1noop
// - Validation for setting active cluster version to v21.2.0-1noop first
// checks to see that all nodes are running v21.2 binaries
// Then concurrently:
// - A new node is added to the cluster, but running binary v21.1
// - We try bumping the cluster gates to v21.2.0-1noop
//
// If the v21.1 nodes manages to sneak in before the version bump, it's
// fine as the version bump is a no-op one. Any subsequent bumps (including
// the "actual" one bumping to v21.2.0) will fail during validation.
//
// If the v21.1 node is only added after v21.2.0-1noop is active, it won't
// be able to actually join the cluster (it'll be prevented by the join
// RPC).
//
// It would be nice to only contain this "fence" version tag within this
// package. Perhaps by defining yet another proto version type, but for
// pkg/migrations internal use only? I think the UX we want for engineers
// defining migrations is that they'd only care about introducing the next
// version key within pkg/clusterversion, and registering a corresponding
// migration for it here.
var clusterVersions = []clusterversion.ClusterVersion{to}
clusterVersions := clusterversion.GetVersionsBetween(from, to)
if len(clusterVersions) == 0 {
// We're attempt to migrate to something that's not defined in cluster
// versions. This only happens in tests, when we're exercising version
// upgrades over non-existent versions (like in the cluster_version
// logictest). These tests explicitly override the
// binary{,MinSupportedVersion} in order to work. End-user attempts to
// do something similar would be caught at the sql layer (also tested in
// the same logictest). We'll just explicitly append the target version
// here instead, so that we're able to actually migrate into it.
clusterVersions = append(clusterVersions, to)
}
log.Infof(ctx, "migrating cluster from %s to %s (stepping through %s)", from, to, clusterVersions)

for _, clusterVersion := range clusterVersions {
h := &Helper{Manager: m}
Expand All @@ -257,13 +230,60 @@ func (m *Manager) Migrate(ctx context.Context, from, to clusterversion.ClusterVe
// return. The migration associated with the specific version can assume
// that every node in the cluster has the corresponding version
// activated.
//
// We'll need to first bump the fence version for each intermediate
// cluster version, before bumping the "real" one. Doing so allows us to
// provide the invariant that whenever a cluster version is active, all
// nodes in the cluster (including ones added concurrently during
// version upgrades) are running binaries that know about the version.

{
// First sanity check that we'll actually be able to perform the
// cluster version bump, cluster-wide.
req := &serverpb.ValidateTargetClusterVersionRequest{
ClusterVersion: &clusterVersion,
// The migrations infrastructure makes use of internal fence
// versions when stepping through consecutive versions. It's
// instructive to walk through how we expect a version migration
// from v21.1 to v21.2 to take place, and how we behave in the
// presence of new v21.1 or v21.2 nodes being added to the cluster.
// - All nodes are running v21.1
// - All nodes are rolled into v21.2 binaries, but with active
// cluster version still as v21.1
// - The first version bump will be into v21.2-1(fence), see the
// migration manager above for where that happens
// Then concurrently:
// - A new node is added to the cluster, but running binary v21.1
// - We try bumping the cluster gates to v21.2-1(fence)
//
// If the v21.1 nodes manages to sneak in before the version bump,
// it's fine as the version bump is a no-op one (all fence versions
// are). Any subsequent bumps (including the "actual" one bumping to
// v21.2) will fail during the validation step where we'll first
// check to see that all nodes are running v21.2 binaries.
//
// If the v21.1 node is only added after v21.2-1(fence) is active,
// it won't be able to actually join the cluster (it'll be prevented
// by the join RPC).
//
// All of which is to say that once we've seen the node list
// stabilize (as EveryNode enforces), any new nodes that can join
// the cluster will run a release that support the fence version,
// and by design also supports the actual version (which is the
// direct successor of the fence).
fenceVersion := fenceVersionFor(ctx, clusterVersion)
req := &serverpb.BumpClusterVersionRequest{ClusterVersion: &fenceVersion}
op := fmt.Sprintf("bump-cv=%s", req.ClusterVersion.PrettyPrint())
err := h.EveryNode(ctx, op, func(ctx context.Context, client serverpb.MigrationClient) error {
_, err := client.BumpClusterVersion(ctx, req)
return err
})
if err != nil {
return err
}
err := h.EveryNode(ctx, "validate-cv", func(ctx context.Context, client serverpb.MigrationClient) error {
}
{
// Now sanity check that we'll actually be able to perform the real
// cluster version bump, cluster-wide.
req := &serverpb.ValidateTargetClusterVersionRequest{ClusterVersion: &clusterVersion}
op := fmt.Sprintf("validate-cv=%s", req.ClusterVersion.PrettyPrint())
err := h.EveryNode(ctx, op, func(ctx context.Context, client serverpb.MigrationClient) error {
_, err := client.ValidateTargetClusterVersion(ctx, req)
return err
})
Expand All @@ -272,20 +292,19 @@ func (m *Manager) Migrate(ctx context.Context, from, to clusterversion.ClusterVe
}
}
{
req := &serverpb.BumpClusterVersionRequest{
ClusterVersion: &clusterVersion,
}
err := h.EveryNode(ctx, "bump-cv", func(ctx context.Context, client serverpb.MigrationClient) error {
// Finally, bump the real version cluster-wide.
req := &serverpb.BumpClusterVersionRequest{ClusterVersion: &clusterVersion}
op := fmt.Sprintf("bump-cv=%s", req.ClusterVersion.PrettyPrint())
if err := h.EveryNode(ctx, op, func(ctx context.Context, client serverpb.MigrationClient) error {
_, err := client.BumpClusterVersion(ctx, req)
return err
})
if err != nil {
}); err != nil {
return err
}
}

// TODO(irfansharif): We'll want to retrieve the right migration off of
// our registry of migrations, and execute it.
// our registry of migrations, if any, and execute it.
// TODO(irfansharif): We'll want to be able to override which migration
// is retrieved here within tests. We could make the registry be a part
// of the manager, and all tests to provide their own.
Expand All @@ -294,3 +313,29 @@ func (m *Manager) Migrate(ctx context.Context, from, to clusterversion.ClusterVe

return nil
}

// fenceVersionFor constructs the appropriate "fence version" for the given
// cluster version. Fence versions allow the migrations infrastructure to safely
// step through consecutive cluster versions in the presence of nodes (running
// any binary version) being added to the cluster. See the migration manager
// above for intended usage.
//
// Fence versions (and the migrations infrastructure entirely) were introduced
// in the 21.1 release cycle. In the same release cycle, we introduced the
// invariant that new user-defined versions (users being crdb engineers) must
// always have even-numbered Internal versions, thus reserving the odd numbers
// to slot in fence versions for each cluster version. See top-level
// documentation in pkg/clusterversion for more details.
func fenceVersionFor(
ctx context.Context, cv clusterversion.ClusterVersion,
) clusterversion.ClusterVersion {
if (cv.Internal % 2) != 0 {
log.Fatalf(ctx, "only even numbered internal versions allowed, found %s", cv.Version)
}

// We'll pick the odd internal version preceding the cluster version,
// slotting ourselves right before it.
fenceCV := cv
fenceCV.Internal--
return fenceCV
}
5 changes: 5 additions & 0 deletions pkg/roachpb/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ func (v Version) Less(otherV Version) bool {
return false
}

// LessEq returns whether the receiver is less than or equal to the parameter.
func (v Version) LessEq(otherV Version) bool {
return v.Equal(otherV) || v.Less(otherV)
}

// String implements the fmt.Stringer interface.
func (v Version) String() string { return redact.StringWithoutMarkers(v) }

Expand Down
Loading

0 comments on commit 6787eec

Please sign in to comment.