Skip to content

Commit

Permalink
migrate snapshots
Browse files Browse the repository at this point in the history
  • Loading branch information
Tomy2e committed Jul 26, 2024
1 parent 2e36993 commit 3a77452
Show file tree
Hide file tree
Showing 7 changed files with 289 additions and 77 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM golang:1.22-alpine as builder
FROM golang:1.22-alpine AS builder

RUN apk update && apk add --no-cache git ca-certificates && update-ca-certificates

Expand Down
34 changes: 22 additions & 12 deletions cmd/sbs-migration/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,26 @@ package main
import (
"context"
"flag"
"path/filepath"

"github.com/scaleway/scaleway-csi/pkg/driver"
"github.com/scaleway/scaleway-csi/pkg/migration"
"github.com/scaleway/scaleway-csi/pkg/scaleway"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/homedir"
"k8s.io/klog/v2"
)

func main() {
ctx := context.Background()

var kubeconfig *string
if home := homedir.HomeDir(); home != "" {
kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
} else {
kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
}
dryRun := flag.Bool("dry-run", false, "When set to true, volumes and snapshots will not be migrated")
var (
ctx = context.Background()

// Flags
kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
disableVolumeMigration = flag.Bool("disable-volume-migration", false, "Disables listing volumes and migrating them")
disableSnapshotMigration = flag.Bool("disable-snapshot-migration", false, "Disables listing snapshots and migrating them")
dryRun = flag.Bool("dry-run", false, "Simulates the volume and snapshot migration process")
)
flag.Parse()

// Create Kubernetes client.
Expand All @@ -37,14 +36,25 @@ func main() {
klog.Fatal(err)
}

dynClient, err := dynamic.NewForConfig(config)
if err != nil {
klog.Fatal(err)
}

// Create Scaleway client.
scw, err := scaleway.New(driver.UserAgent())
if err != nil {
klog.Fatal(err)
}

// Migrate volumes and snapshots from Instance to Block API.
if err := migration.New(clientset, scw, *dryRun).Do(ctx); err != nil {
opts := &migration.Options{
DryRun: *dryRun,
DisableVolumeMigration: *disableVolumeMigration,
DisableSnapshotMigration: *disableSnapshotMigration,
}

if err := migration.New(clientset, dynClient, scw, opts).Do(ctx); err != nil {
klog.Fatal(err)
}
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ require (
github.com/scaleway/scaleway-sdk-go v1.0.0-beta.26
golang.org/x/crypto v0.22.0
golang.org/x/exp v0.0.0-20240416160154-fe59bbe5cc7f
golang.org/x/sync v0.7.0
golang.org/x/sys v0.19.0
google.golang.org/grpc v1.63.2
google.golang.org/protobuf v1.33.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand Down
241 changes: 191 additions & 50 deletions pkg/migration/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,28 +12,64 @@ import (
"github.com/avast/retry-go/v4"
"github.com/scaleway/scaleway-csi/pkg/driver"
"github.com/scaleway/scaleway-csi/pkg/scaleway"
"github.com/scaleway/scaleway-sdk-go/api/instance/v1"
"golang.org/x/sync/errgroup"
kerror "k8s.io/apimachinery/pkg/api/errors"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"
)

// retryOpts are the default options for retrying requests to Scaleway API.
var retryOpts = []retry.Option{
retry.RetryIf(scaleway.IsTooManyRequestsError),
retry.Delay(1 * time.Second),
const (
maxParallelMigrations = 3
retryDelay = 1 * time.Second
)

var (
volSnapContentRes = schema.GroupVersionResource{Group: "snapshot.storage.k8s.io", Version: "v1", Resource: "volumesnapshotcontents"}

// retryOpts are the default options for retrying requests to Scaleway API.
retryOpts = []retry.Option{
retry.RetryIf(isRetryable),
retry.Delay(retryDelay),
}
)

type Options struct {
DryRun bool
DisableVolumeMigration bool
DisableSnapshotMigration bool
}

// The Migration struct holds a Kubernetes and Scaleway client.
type Migration struct {
k8s kubernetes.Interface
k8sDyn *dynamic.DynamicClient
scw *scaleway.Scaleway
dryRun bool
opts *Options
}

// New returns a new instance of Migration with the specified k8s/scw clients.
func New(k8s kubernetes.Interface, scw *scaleway.Scaleway, dryRun bool) *Migration {
return &Migration{k8s, scw, dryRun}
func New(k8s kubernetes.Interface, k8sDyn *dynamic.DynamicClient, scw *scaleway.Scaleway, opts *Options) *Migration {
return &Migration{k8s, k8sDyn, scw, opts}
}

// isManagedCluster returns true if a Scaleway managed node is found in the k8s cluster.
func (m *Migration) isManagedCluster(ctx context.Context) (bool, error) {
nodes, err := m.k8s.CoreV1().Nodes().List(ctx, v1.ListOptions{})
if err != nil {
return false, fmt.Errorf("failed to list nodes: %w", err)
}

for _, node := range nodes.Items {
if _, ok := node.Labels["k8s.scaleway.com/managed"]; ok {
return true, nil
}
}

return false, nil
}

// listHandles lists handles of PersistentVolumes managed by the Scaleway CSI driver.
Expand All @@ -54,63 +90,165 @@ func (m *Migration) listHandles(ctx context.Context) ([]string, error) {
return handles, nil
}

// migrateHandle migrates a Scaleway volume using the provided handle. If the
// handle is invalid, it is skipped. If the volume does not exist in the Instance
// API, we assume it was already migrated. The first return value is true if the
// volume was effectively migrated.
func (m *Migration) migrateHandle(ctx context.Context, handle string) (bool, error) {
// listSnapshotHandles lists handles of VolumentSnapshotContents managed by the Scaleway CSI driver.
func (m *Migration) listSnapshotHandles(ctx context.Context) ([]string, error) {
volsnapcontents, err := m.k8sDyn.Resource(volSnapContentRes).List(ctx, v1.ListOptions{})
if err != nil {
if kerror.IsNotFound(err) {
klog.Warningf("Could not list VolumeSnapshotContents, CRD is probably missing: %s", err)
return nil, nil
}

return nil, fmt.Errorf("failed to list k8s VolumeSnapshotContents: %w", err)
}

handles := make([]string, 0, len(volsnapcontents.Items))

for _, v := range volsnapcontents.Items {
d, ok, err := unstructured.NestedString(v.Object, "spec", "driver")
if err != nil {
return nil, fmt.Errorf("failed to get driver for %s: %w", v.GetName(), err)
}
// Skip snapshots not managed by this driver.
if !ok || d != driver.DriverName {
continue
}

h, ok, err := unstructured.NestedString(v.Object, "status", "snapshotHandle")
if err != nil {
return nil, fmt.Errorf("failed to get snapshotHandle for %s: %w", v.GetName(), err)
}
// Skip snapshots with missing handle.
if !ok {
continue
}

handles = append(handles, h)
}

return handles, nil
}

type migrateKind string

const (
volumeMigrateKind migrateKind = "volume"
snapshotMigrateKind migrateKind = "snapshot"
)

// migrateHandle migrates a Scaleway volume or snapshot using the provided handle.
// If the handle is invalid, it is skipped. If the volume or snapshot does not exist
// in the Instance API, we assume it was already migrated. The first return value is
// true if the volume or snapshot was effectively migrated.
func (m *Migration) migrateHandle(ctx context.Context, kind migrateKind, handle string) (bool, error) {
id, zone, err := driver.ExtractIDAndZone(handle)
if err != nil {
// Skip migration if handle is not valid.
klog.Warningf("Failed to extract ID and Zone from handle %q, it will not be migrated: %s", handle, err)
return false, nil
}

if _, err = retry.DoWithData(func() (*instance.Volume, error) {
return m.scw.GetLegacyVolume(ctx, id, zone) //nolint:wrapcheck
if err = retry.Do(func() (err error) {
switch kind {
case volumeMigrateKind:
_, err = m.scw.GetLegacyVolume(ctx, id, zone)
case snapshotMigrateKind:
_, err = m.scw.GetLegacySnapshot(ctx, id, zone)
default:
panic(fmt.Sprintf("unknown kind: %s", kind))
}
return
}, retryOpts...); err != nil {
// If legacy volume does not exist, we assume it was already migrated.
// If legacy resource does not exist, we assume it was already migrated.
if scaleway.IsNotFoundError(err) {
return false, nil
}

return false, fmt.Errorf("failed to get legacy volume: %w", err)
return false, fmt.Errorf("failed to get legacy %s: %w", kind, err)
}

if err := retry.Do(func() error {
return m.scw.MigrateLegacyVolume(ctx, id, zone, m.dryRun) //nolint:wrapcheck
switch kind {
case volumeMigrateKind:
return m.scw.MigrateLegacyVolume(ctx, id, zone, m.opts.DryRun) //nolint:wrapcheck
case snapshotMigrateKind:
return m.scw.MigrateLegacySnapshot(ctx, id, zone, m.opts.DryRun) //nolint:wrapcheck
default:
panic(fmt.Sprintf("unknown kind: %s", kind))
}
}, retryOpts...); err != nil {
return false, fmt.Errorf("could not migrate volume with handle %q: %w", handle, err)
return false, fmt.Errorf("could not migrate %s with handle %q: %w", kind, handle, err)
}

return true, nil
}

// isManagedCluster returns true if a Scaleway managed node is found in the k8s cluster.
func (m *Migration) isManagedCluster(ctx context.Context) (bool, error) {
nodes, err := m.k8s.CoreV1().Nodes().List(ctx, v1.ListOptions{})
func (m *Migration) migrate(ctx context.Context, kind migrateKind) error {
// List handles.
var (
handles []string
err error
)
switch kind {
case snapshotMigrateKind:
handles, err = m.listSnapshotHandles(ctx)
case volumeMigrateKind:
handles, err = m.listHandles(ctx)
default:
panic(fmt.Sprintf("unknown kind: %s", kind))
}
if err != nil {
return false, fmt.Errorf("failed to list nodes: %w", err)
return fmt.Errorf("could not list handles: %w", err)
}

for _, node := range nodes.Items {
if _, ok := node.Labels["k8s.scaleway.com/managed"]; ok {
return true, nil
switch kind {
case snapshotMigrateKind:
klog.Infof("Found %d Scaleway VolumeSnapshotContents in the cluster", len(handles))
case volumeMigrateKind:
klog.Infof("Found %d Scaleway PersistentVolumes in the cluster", len(handles))
default:
panic(fmt.Sprintf("unknown kind: %s", kind))
}

// Run handle migrations in parallel.
eg, ctx := errgroup.WithContext(ctx)
eg.SetLimit(maxParallelMigrations)

for _, handle := range handles {
select {
case <-ctx.Done():
default:
eg.Go(func() error {
klog.Infof("Migrating %s with handle %s", kind, handle)

ok, err := m.migrateHandle(ctx, kind, handle)
if err != nil {
return fmt.Errorf("failed to migrate handle %s: %w", handle, err)
}

if ok {
klog.Infof("%s with handle %s was successfully migrated", kind, handle)
} else {
klog.Infof("%s with handle %s was not migrated", kind, handle)
}

return nil
})
}
}

return false, nil
if err := eg.Wait(); err != nil {
return fmt.Errorf("%s migration failed: %w", kind, err)
}

return nil
}

// Do runs the migration of all Scaleway PersistentVolumes from the Instance API
// to the new Block API.
// Do runs the migration of all Scaleway PersistentVolumes and VolumeSnapshotContents
// from the Instance API to the new Block API.
func (m *Migration) Do(ctx context.Context) error {
if m.dryRun {
klog.Infof("Dry run enabled, volumes and snapshots will not be migrated")
}

// Quick check to make sure this tool is not run on a managed cluster.
if os.Getenv("IGNORE_MANAGED_CLUSTER") == "" {
if os.Getenv("IGNORE_MANAGED_CLUSTER") != "true" {
managed, err := m.isManagedCluster(ctx)
if err != nil {
return fmt.Errorf("failed to check if cluster is managed: %w", err)
Expand All @@ -125,27 +263,30 @@ func (m *Migration) Do(ctx context.Context) error {
}
}

handles, err := m.listHandles(ctx)
if err != nil {
return fmt.Errorf("could not list handles: %w", err)
}

klog.Infof("Found %d Scaleway PersistentVolumes in the cluster", len(handles))

for _, handle := range handles {
klog.Infof("Migrating volume with handle %s", handle)
// Migrate volumes first.
if !m.opts.DisableVolumeMigration {
if m.opts.DryRun {
klog.Infof("Dry run enabled, volumes and their associated snapshots will not be migrated")
}

ok, err := m.migrateHandle(ctx, handle)
if err != nil {
return fmt.Errorf("failed to migrate handle %s: %w", handle, err)
if err := m.migrate(ctx, volumeMigrateKind); err != nil {
return err
}
}

if ok {
klog.Infof("Volume with handle %s was successfully migrated", handle)
} else {
klog.Infof("Volume with handle %s was not migrated", handle)
// Migrate snapshots.
if !m.opts.DisableSnapshotMigration {
if m.opts.DryRun {
klog.Infof("Dry run enabled, snapshots and their associated volumes will not be migrated")
}
if err := m.migrate(ctx, snapshotMigrateKind); err != nil {
return err
}
}

return nil
}

func isRetryable(err error) bool {
return scaleway.IsTooManyRequestsError(err) || scaleway.IsInternalServerError(err)
}
Loading

0 comments on commit 3a77452

Please sign in to comment.